You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2017/03/01 14:32:58 UTC

[15/50] [abbrv] ignite git commit: Implemented.

Implemented.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64ba13b0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64ba13b0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64ba13b0

Branch: refs/heads/master
Commit: 64ba13b0a3be6acbf7d629029b460a39c2e2b388
Parents: 4eac51c
Author: AMRepo <an...@gmail.com>
Authored: Mon Feb 20 21:24:29 2017 +0300
Committer: Andrey V. Mashenkov <an...@gmail.com>
Committed: Tue Feb 21 11:52:40 2017 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       |  48 ++++
 .../processors/cache/GridCacheProcessor.java    |   3 +
 .../processors/cache/IgniteCacheProxy.java      |   6 +-
 .../closure/GridClosureProcessor.java           |   2 +-
 .../processors/query/GridQueryIndexing.java     |  27 +-
 .../processors/query/GridQueryProcessor.java    | 141 +++-------
 .../messages/GridQueryNextPageRequest.java      |  29 +-
 .../messages/GridQueryNextPageResponse.java     |  29 +-
 .../cache/query/GridCacheTwoStepQuery.java      |  17 ++
 .../processors/query/h2/IgniteH2Indexing.java   | 235 ++++++++++++++--
 .../query/h2/opt/DistributedJoinMode.java       |  51 ++++
 .../query/h2/opt/GridH2IndexBase.java           | 264 +++++++++++++-----
 .../query/h2/opt/GridH2QueryContext.java        |  84 ++++--
 .../query/h2/opt/GridH2TreeIndex.java           | 232 ++++++++++++----
 .../query/h2/twostep/GridMapQueryExecutor.java  | 227 +++++++++++----
 .../query/h2/twostep/GridMergeIndex.java        |  39 ++-
 .../h2/twostep/GridReduceQueryExecutor.java     |  69 +++--
 .../h2/twostep/msg/GridH2IndexRangeRequest.java |  60 +++-
 .../twostep/msg/GridH2IndexRangeResponse.java   |  62 ++++-
 .../h2/twostep/msg/GridH2QueryRequest.java      |   5 +
 .../query/IgniteSqlSegmentedIndexSelfTest.java  | 263 ++++++++++++++++++
 .../query/IgniteSqlSplitterSelfTest.java        | 139 +++++++++-
 .../h2/GridIndexingSpiAbstractSelfTest.java     |  26 +-
 .../FetchingQueryCursorStressTest.java          | 277 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite.java          |   2 +
 25 files changed, 1917 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 0656dda..149f25a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -223,6 +223,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Default threshold for concurrent loading of keys from {@link CacheStore}. */
     public static final int DFLT_CONCURRENT_LOAD_ALL_THRESHOLD = 5;
 
+    /** Default SQL query parallelism level */
+    public static final int DFLT_SQL_QUERY_PARALLELISM_LVL = 1;
+
     /** Cache name. */
     private String name;
 
@@ -410,6 +413,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     /** Query entities. */
     private Collection<QueryEntity> qryEntities;
 
+    /** */
+    private int qryParallelism = DFLT_SQL_QUERY_PARALLELISM_LVL;
+
     /** Empty constructor (all values are initialized to their defaults). */
     public CacheConfiguration() {
         /* No-op. */
@@ -462,6 +468,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         interceptor = cc.getInterceptor();
         invalidate = cc.isInvalidate();
         isReadThrough = cc.isReadThrough();
+        qryParallelism = cc.getQueryParallelism();
         isWriteThrough = cc.isWriteThrough();
         storeKeepBinary = cc.isStoreKeepBinary() != null ? cc.isStoreKeepBinary() : DFLT_STORE_KEEP_BINARY;
         listenerConfigurations = cc.listenerConfigurations;
@@ -2108,6 +2115,47 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
     }
 
     /**
+     * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+     * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+     * certain restrictions on parallelism depending on query type and/or cache type.
+     * <p>
+     * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+     * <ul>
+     *     <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+     *     <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+     *     otherwise</li>
+     * </ul>
+     * These restrictions will be removed in future versions of Apache Ignite.
+     * <p>
+     * Defaults to {@code 1}.
+     */
+    public int getQueryParallelism() {
+        return qryParallelism;
+    }
+
+    /**
+     * Defines a hint to query execution engine on desired degree of parallelism within a single node.
+     * Query executor may or may not use this hint depending on estimated query costs. Query executor may define
+     * certain restrictions on parallelism depending on query type and/or cache type.
+     * <p>
+     * As of {@code Apache Ignite 1.9} this hint is only supported for SQL queries with the following restrictions:
+     * <ul>
+     *     <li>Hint cannot be used for {@code REPLICATED} cache, exception is thrown otherwise</li>
+     *     <li>All caches participating in query must have the same degree of parallelism, exception is thrown
+     *     otherwise</li>
+     * </ul>
+     * These restrictions will be removed in future versions of Apache Ignite.
+     *
+     * @param qryParallelism Query parallelizm level.
+     * @return {@code this} for chaining.
+     */
+    public CacheConfiguration<K,V> setQueryParallelism(int qryParallelism) {
+        this.qryParallelism = qryParallelism;
+
+        return this;
+    }
+
+    /**
      * Gets topology validator.
      * <p>
      * See {@link TopologyValidator} for details.

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7093403..c3e3f3b 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -269,6 +269,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (cfg.getCacheMode() == REPLICATED)
             cfg.setBackups(Integer.MAX_VALUE);
 
+        if( cfg.getQueryParallelism() > 1 && cfg.getCacheMode() != PARTITIONED)
+            throw new IgniteCheckedException("Cache index segmentation is supported for PARTITIONED mode only.");
+
         if (cfg.getAffinityMapper() == null)
             cfg.setAffinityMapper(cacheObjCtx.defaultAffMapper());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 1381670..f806d05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -729,12 +729,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
                 final SqlQuery p = (SqlQuery)qry;
 
                 if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
-                    return (QueryCursor<R>)new QueryCursorImpl<>(new Iterable<Cache.Entry<K, V>>() {
-                        @Override public Iterator<Cache.Entry<K, V>> iterator() {
-                            return ctx.kernalContext().query().queryLocal(ctx, p,
+                     return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
                                 opCtxCall != null && opCtxCall.isKeepBinary());
-                        }
-                    });
 
                 return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 20fb6a0..61ed8a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -902,7 +902,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
+    public <R> IgniteInternalFuture<R> callLocal(@Nullable final Callable<R> c, byte plc)
         throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index ca04724..37f0ade 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -84,35 +84,26 @@ public interface GridQueryIndexing {
     /**
      * Queries individual fields (generally used by JDBC drivers).
      *
-     * @param spaceName Space name.
+     * @param cctx Cache context.
      * @param qry Query.
-     * @param params Query parameters.
      * @param filter Space name and key filter.
-     * @param enforceJoinOrder Enforce join order of tables in the query.
-     * @param timeout Query timeout in milliseconds.
      * @param cancel Query cancel.
-     * @return Query result.
-     * @throws IgniteCheckedException If failed.
+     * @return Cursor.
      */
-    public GridQueryFieldsResult queryLocalSqlFields(@Nullable String spaceName, String qry,
-        Collection<Object> params, IndexingQueryFilter filter, boolean enforceJoinOrder, int timeout,
-        GridQueryCancel cancel) throws IgniteCheckedException;
+    public <K, V> QueryCursor<List<?>> queryLocalSqlFields(GridCacheContext<?, ?> cctx, SqlFieldsQuery qry,
+        IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException;
 
     /**
      * Executes regular query.
      *
-     * @param spaceName Space name.
+     * @param cctx Cache context.
      * @param qry Query.
-     * @param alias Table alias used in Query.
-     * @param params Query parameters.
-     * @param type Query return type.
      * @param filter Space name and key filter.
-     * @return Queried rows.
-     * @throws IgniteCheckedException If failed.
+     * @param keepBinary Keep binary flag.
+     * @return Cursor.
      */
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName, String qry,
-        String alias, Collection<Object> params, GridQueryTypeDescriptor type, IndexingQueryFilter filter)
-        throws IgniteCheckedException;
+    public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(GridCacheContext<?, ?> cctx, SqlQuery qry,
+        IndexingQueryFilter filter, boolean keepBinary) throws IgniteCheckedException;
 
     /**
      * Executes text query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index ee9224b..85744d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -754,42 +754,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 INDEXING.module() + " to classpath or moving it from 'optional' to 'libs' folder).");
     }
 
-    /**
-     * @param space Space.
-     * @param clause Clause.
-     * @param params Parameters collection.
-     * @param resType Result type.
-     * @param filters Filters.
-     * @return Key/value rows.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> query(final String space, final String clause,
-        final Collection<Object> params, final String resType, final IndexingQueryFilter filters)
-        throws IgniteCheckedException {
-        checkEnabled();
-
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to execute query (grid is stopping).");
-
-        try {
-            final GridCacheContext<?, ?> cctx = ctx.cache().internalCache(space).context();
-
-            return executeQuery(GridCacheQueryType.SQL_FIELDS, clause, cctx, new IgniteOutClosureX<GridCloseableIterator<IgniteBiTuple<K, V>>>() {
-                @Override public GridCloseableIterator<IgniteBiTuple<K, V>> applyx() throws IgniteCheckedException {
-                    TypeDescriptor type = typesByName.get(new TypeName(space, resType));
-
-                    if (type == null || !type.registered())
-                        throw new CacheException("Failed to find SQL table for type: " + resType);
-
-                    return idx.queryLocalSql(space, clause, null, params, type, filters);
-                }
-            }, false);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
 
     /**
      * @param cctx Cache context.
@@ -829,11 +793,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
-                @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
-                    return idx.queryTwoStep(cctx, qry);
-                }
-            }, true);
+            return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
+                new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+                    @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
+                        return idx.queryTwoStep(cctx, qry);
+                    }
+                }, true);
         }
         catch (IgniteCheckedException e) {
             throw new IgniteException(e);
@@ -849,7 +814,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
      * @param keepBinary Keep binary flag.
      * @return Cursor.
      */
-    public <K, V> Iterator<Cache.Entry<K, V>> queryLocal(
+    public <K, V> QueryCursor<Cache.Entry<K, V>> queryLocal(
         final GridCacheContext<?, ?> cctx,
         final SqlQuery qry,
         final boolean keepBinary
@@ -859,54 +824,25 @@ public class GridQueryProcessor extends GridProcessorAdapter {
 
         try {
             return executeQuery(GridCacheQueryType.SQL, qry.getSql(), cctx,
-                new IgniteOutClosureX<Iterator<Cache.Entry<K, V>>>() {
-                    @Override public Iterator<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
-                        String space = cctx.name();
+                new IgniteOutClosureX<QueryCursor<Cache.Entry<K, V>>>() {
+                    @Override public QueryCursor<Cache.Entry<K, V>> applyx() throws IgniteCheckedException {
                         String type = qry.getType();
-                        String sqlQry = qry.getSql();
-                        Object[] params = qry.getArgs();
 
-                        TypeDescriptor typeDesc = typesByName.get(
-                            new TypeName(
-                                space,
+                        GridQueryProcessor.TypeDescriptor typeDesc = typesByName.get(
+                            new GridQueryProcessor.TypeName(
+                                cctx.name(),
                                 type));
 
                         if (typeDesc == null || !typeDesc.registered())
                             throw new CacheException("Failed to find SQL table for type: " + type);
 
-                        final GridCloseableIterator<IgniteBiTuple<K, V>> i = idx.queryLocalSql(
-                            space,
-                            qry.getSql(),
-                            qry.getAlias(),
-                            F.asList(params),
-                            typeDesc,
-                            idx.backupFilter(requestTopVer.get(), null));
+                        qry.setType(typeDesc.name());
 
                         sendQueryExecutedEvent(
-                            sqlQry,
-                            params);
-
-                        return new ClIter<Cache.Entry<K, V>>() {
-                            @Override public void close() throws Exception {
-                                i.close();
-                            }
-
-                            @Override public boolean hasNext() {
-                                return i.hasNext();
-                            }
-
-                            @Override public Cache.Entry<K, V> next() {
-                                IgniteBiTuple<K, V> t = i.next();
-
-                                return new CacheEntryImpl<>(
-                                    (K)cctx.unwrapBinaryIfNeeded(t.getKey(), keepBinary, false),
-                                    (V)cctx.unwrapBinaryIfNeeded(t.getValue(), keepBinary, false));
-                            }
-
-                            @Override public void remove() {
-                                throw new UnsupportedOperationException();
-                            }
-                        };
+                            qry.getSql(),
+                            qry.getArgs());
+
+                        return idx.queryLocalSql(cctx, qry, idx.backupFilter(requestTopVer.get(), null), keepBinary);
                     }
                 }, true);
         }
@@ -994,13 +930,6 @@ public class GridQueryProcessor extends GridProcessorAdapter {
     }
 
     /**
-     * Closeable iterator.
-     */
-    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
-        // No-op.
-    }
-
-    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Iterator.
@@ -1010,34 +939,26 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             throw new IllegalStateException("Failed to execute query (grid is stopping).");
 
         try {
-            final boolean keepBinary = cctx.keepBinary();
-
             return executeQuery(GridCacheQueryType.SQL_FIELDS, qry.getSql(), cctx, new IgniteOutClosureX<QueryCursor<List<?>>>() {
                 @Override public QueryCursor<List<?>> applyx() throws IgniteCheckedException {
-                    final String space = cctx.name();
-                    final String sql = qry.getSql();
-                    final Object[] args = qry.getArgs();
-                    final GridQueryCancel cancel = new GridQueryCancel();
+                    GridQueryCancel cancel = new GridQueryCancel();
 
-                    final GridQueryFieldsResult res = idx.queryLocalSqlFields(space, sql, F.asList(args),
-                        idx.backupFilter(requestTopVer.get(), null), qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+                    final QueryCursor<List<?>> cursor = idx.queryLocalSqlFields(cctx, qry,
+                        idx.backupFilter(requestTopVer.get(), null), cancel);
 
-                    QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                    return new QueryCursorImpl<List<?>>(new Iterable<List<?>>() {
                         @Override public Iterator<List<?>> iterator() {
-                            try {
-                                sendQueryExecutedEvent(sql, args);
-
-                                return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
-                            }
-                            catch (IgniteCheckedException e) {
-                                throw new IgniteException(e);
-                            }
-                        }
-                    }, cancel);
-
-                    cursor.fieldsMeta(res.metaData());
+                            sendQueryExecutedEvent(qry.getSql(), qry.getArgs());
 
-                    return cursor;
+                            return cursor.iterator();
+                        }
+                    }, cancel) {
+                        @Override public List<GridQueryFieldMetadata> fieldsMeta() {
+                            if (cursor instanceof QueryCursorImpl)
+                                return ((QueryCursorImpl)cursor).fieldsMeta();
+                            return super.fieldsMeta();
+                        }
+                    };
                 }
             }, true);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
index 1feff5a..acea084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep.messages;
 
-
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -35,6 +34,9 @@ public class GridQueryNextPageRequest implements Message {
     private long qryReqId;
 
     /** */
+    private int segmentId;
+
+    /** */
     private int qry;
 
     /** */
@@ -50,11 +52,13 @@ public class GridQueryNextPageRequest implements Message {
     /**
      * @param qryReqId Query request ID.
      * @param qry Query.
+     * @param segmentId Index segment ID.
      * @param pageSize Page size.
      */
-    public GridQueryNextPageRequest(long qryReqId, int qry, int pageSize) {
+    public GridQueryNextPageRequest(long qryReqId, int qry, int segmentId, int pageSize) {
         this.qryReqId = qryReqId;
         this.qry = qry;
+        this.segmentId = segmentId;
         this.pageSize = pageSize;
     }
 
@@ -72,6 +76,11 @@ public class GridQueryNextPageRequest implements Message {
         return qry;
     }
 
+    /** @return Index segment ID */
+    public int segmentId() {
+        return segmentId;
+    }
+
     /**
      * @return Page size.
      */
@@ -119,6 +128,12 @@ public class GridQueryNextPageRequest implements Message {
 
                 writer.incrementState();
 
+            case 3:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -156,6 +171,14 @@ public class GridQueryNextPageRequest implements Message {
 
                 reader.incrementState();
 
+            case 3:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridQueryNextPageRequest.class);
@@ -168,6 +191,6 @@ public class GridQueryNextPageRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 3;
+        return 4;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 4889069..e85c00b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -42,6 +42,9 @@ public class GridQueryNextPageResponse implements Message {
     private long qryReqId;
 
     /** */
+    private int segmentId;
+
+    /** */
     private int qry;
 
     /** */
@@ -73,6 +76,7 @@ public class GridQueryNextPageResponse implements Message {
 
     /**
      * @param qryReqId Query request ID.
+     * @param segmentId Index segment ID.
      * @param qry Query.
      * @param page Page.
      * @param allRows All rows count.
@@ -80,12 +84,13 @@ public class GridQueryNextPageResponse implements Message {
      * @param vals Values for rows in this page added sequentially.
      * @param plainRows Not marshalled rows for local node.
      */
-    public GridQueryNextPageResponse(long qryReqId, int qry, int page, int allRows, int cols,
+    public GridQueryNextPageResponse(long qryReqId, int segmentId, int qry, int page, int allRows, int cols,
         Collection<Message> vals, Collection<?> plainRows) {
         assert vals != null ^ plainRows != null;
         assert cols > 0 : cols;
 
         this.qryReqId = qryReqId;
+        this.segmentId = segmentId;
         this.qry = qry;
         this.page = page;
         this.allRows = allRows;
@@ -102,6 +107,13 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
+     * @return Index segment ID.
+     */
+    public int segmentId() {
+        return segmentId;
+    }
+
+    /**
      * @return Query.
      */
     public int query() {
@@ -202,6 +214,12 @@ public class GridQueryNextPageResponse implements Message {
 
                 writer.incrementState();
 
+            case 7:
+                if (!writer.writeInt("segmentId", segmentId))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -271,6 +289,13 @@ public class GridQueryNextPageResponse implements Message {
 
                 reader.incrementState();
 
+            case 7:
+                segmentId = reader.readInt("segmentId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return reader.afterMessageRead(GridQueryNextPageResponse.class);
@@ -283,7 +308,7 @@ public class GridQueryNextPageResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 7;
+        return 8;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index f53936f..c127eeb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -69,6 +69,9 @@ public class GridCacheTwoStepQuery {
     /** */
     private List<Integer> extraCaches;
 
+    /** */
+    private boolean local;
+
     /**
      * @param originalSql Original query SQL.
      * @param schemas Schema names in query.
@@ -229,6 +232,20 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return {@code True} If query is local.
+     */
+    public boolean isLocal() {
+        return local;
+    }
+
+    /**
+     * @param local Local query flag.
+     */
+    public void local(boolean local) {
+        this.local = local;
+    }
+
+    /**
      * @param args New arguments to copy with.
      * @return Copy.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index e4b0c1f..2f40d87 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -77,11 +77,13 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -92,6 +94,7 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexing;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -187,6 +190,8 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.FULLTEXT;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.GEO_SPATIAL;
 import static org.apache.ignite.internal.processors.query.GridQueryIndexType.SORTED;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.distributedJoinMode;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.PREPARE;
@@ -810,10 +815,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             removeTable(tbl);
     }
 
-    /** {@inheritDoc} */
+    /**
+     * Queries individual fields (generally used by JDBC drivers).
+     *
+     * @param spaceName Space name.
+     * @param qry Query.
+     * @param params Query parameters.
+     * @param filter Space name and key filter.
+     * @param enforceJoinOrder Enforce join order of tables in the query.
+     * @param timeout Query timeout in milliseconds.
+     * @param cancel Query cancel.
+     * @return Query result.
+     * @throws IgniteCheckedException If failed.
+     */
     @SuppressWarnings("unchecked")
-    @Override public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
-        @Nullable final Collection<Object> params, final IndexingQueryFilter filters, boolean enforceJoinOrder,
+    public GridQueryFieldsResult queryLocalSqlFields(@Nullable final String spaceName, final String qry,
+        @Nullable final Collection<Object> params, final IndexingQueryFilter filter, boolean enforceJoinOrder,
         final int timeout, final GridQueryCancel cancel)
         throws IgniteCheckedException {
         final Connection conn = connectionForSpace(spaceName);
@@ -833,7 +850,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
+            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filter, cancel);
         }
 
         List<GridQueryFieldMetadata> meta;
@@ -846,7 +863,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         }
 
         final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
-            .filter(filters).distributedJoins(false);
+            .filter(filter).distributedJoinMode(OFF);
 
         return new GridQueryFieldsResultAdapter(meta, null) {
             @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
@@ -1099,14 +1116,113 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
-        final String qry, String alias, @Nullable final Collection<Object> params, GridQueryTypeDescriptor type,
-        final IndexingQueryFilter filter) throws IgniteCheckedException {
-        final TableDescriptor tbl = tableDescriptor(spaceName, type);
+    @Override public <K, V> QueryCursor<List<?>> queryLocalSqlFields(final GridCacheContext<?, ?> cctx,
+        final SqlFieldsQuery qry, final IndexingQueryFilter filter, final GridQueryCancel cancel)
+        throws IgniteCheckedException {
+
+        if (cctx.config().getQueryParallelism() > 1) {
+            qry.setDistributedJoins(true);
+
+            assert qry.isLocal();
+
+            return queryTwoStep(cctx, qry, cancel);
+        }
+        else {
+            final boolean keepBinary = cctx.keepBinary();
+
+            final String space = cctx.name();
+            final String sql = qry.getSql();
+            final Object[] args = qry.getArgs();
+
+            final GridQueryFieldsResult res = queryLocalSqlFields(space, sql, F.asList(args), filter,
+                qry.isEnforceJoinOrder(), qry.getTimeout(), cancel);
+
+            QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, cancel);
+
+            cursor.fieldsMeta(res.metaData());
+
+            return cursor;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> QueryCursor<Cache.Entry<K,V>> queryLocalSql(final GridCacheContext<?, ?> cctx,
+        final SqlQuery qry, final IndexingQueryFilter filter, final boolean keepBinary) throws IgniteCheckedException {
+        if (cctx.config().getQueryParallelism() > 1) {
+            qry.setDistributedJoins(true);
+
+            assert qry.isLocal();
+
+            return queryTwoStep(cctx, qry);
+        }
+        else {
+            String space = cctx.name();
+            String type = qry.getType();
+            String sqlQry = qry.getSql();
+            String alias = qry.getAlias();
+            Object[] params = qry.getArgs();
+
+            GridQueryCancel cancel = new GridQueryCancel();
+
+            final GridCloseableIterator<IgniteBiTuple<K, V>> i = queryLocalSql(space, sqlQry, alias,
+                F.asList(params), type, filter, cancel);
+
+            return new QueryCursorImpl<Cache.Entry<K, V>>(new Iterable<Cache.Entry<K, V>>() {
+                @Override public Iterator<Cache.Entry<K, V>> iterator() {
+                    return new ClIter<Cache.Entry<K, V>>() {
+                        @Override public void close() throws Exception {
+                            i.close();
+                        }
+
+                        @Override public boolean hasNext() {
+                            return i.hasNext();
+                        }
+
+                        @Override public Cache.Entry<K, V> next() {
+                            IgniteBiTuple<K, V> t = i.next();
+
+                            return new CacheEntryImpl<>(
+                                (K)cctx.unwrapBinaryIfNeeded(t.get1(), keepBinary, false),
+                                (V)cctx.unwrapBinaryIfNeeded(t.get2(), keepBinary, false));
+                        }
+
+                        @Override public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            }, cancel);
+        }
+    }
+
+    /**
+     * Executes regular query.
+     *
+     * @param spaceName Space name.
+     * @param qry Query.
+     * @param alias Table alias.
+     * @param params Query parameters.
+     * @param type Query return type.
+     * @param filter Space name and key filter.
+     * @return Queried rows.
+     * @throws IgniteCheckedException If failed.
+     */
+    public <K, V> GridCloseableIterator<IgniteBiTuple<K, V>> queryLocalSql(@Nullable String spaceName,
+        final String qry, String alias, @Nullable final Collection<Object> params, String type,
+        final IndexingQueryFilter filter, GridQueryCancel cancel) throws IgniteCheckedException {
+        final TableDescriptor tbl = tableDescriptor(type, spaceName);
 
         if (tbl == null)
-            throw new IgniteSQLException("Failed to find SQL table for type: " + type.name(),
+            throw new IgniteSQLException("Failed to find SQL table for type: " + type,
                 IgniteQueryErrorCode.TABLE_NOT_FOUND);
 
         String sql = generateQuery(qry, alias, tbl);
@@ -1115,7 +1231,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         setupConnection(conn, false, false);
 
-        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
+        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter)
+            .distributedJoinMode(OFF));
 
         GridRunningQueryInfo run = new GridRunningQueryInfo(qryIdGen.incrementAndGet(), qry, SQL, spaceName,
             U.currentTimeMillis(), null, true);
@@ -1123,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         runs.put(run.id(), run);
 
         try {
-            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, null);
+            ResultSet rs = executeSqlQueryWithTimer(spaceName, conn, sql, params, true, 0, cancel);
 
             return new KeyValIterator(rs);
         }
@@ -1178,8 +1295,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
         fqry.setDistributedJoins(qry.isDistributedJoins());
+        fqry.setLocal(qry.isLocal());
 
-        if(qry.getTimeout() > 0)
+        if (qry.getTimeout() > 0)
             fqry.setTimeout(qry.getTimeout(), TimeUnit.MILLISECONDS);
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry, null);
@@ -1234,11 +1352,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         final boolean distributedJoins = qry.isDistributedJoins() && cctx.isPartitioned();
         final boolean grpByCollocated = qry.isCollocated();
 
+        final DistributedJoinMode distributedJoinMode = distributedJoinMode(qry.isLocal(), distributedJoins);
+
         GridCacheTwoStepQuery twoStepQry;
         List<GridQueryFieldMetadata> meta;
 
         final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
-            distributedJoins, enforceJoinOrder);
+            distributedJoins, enforceJoinOrder, qry.isLocal());
         TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
@@ -1251,7 +1371,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             setupConnection(c, distributedJoins, enforceJoinOrder);
 
             GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
-                .distributedJoins(distributedJoins));
+                .distributedJoinMode(distributedJoinMode));
 
             PreparedStatement stmt;
 
@@ -1286,9 +1406,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 GridH2QueryContext.clearThreadLocal();
             }
 
-            Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
+            Prepared prepared = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
 
-            if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery) qry).isQuery() != prepared.isQuery())
+            if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != prepared.isQuery())
                 throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
                     IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
 
@@ -1341,8 +1461,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     extraCaches = null;
                 }
 
+                //Prohibit usage indices with different numbers of segments in same query.
+                checkCacheIndexSegmentation(caches);
+
                 twoStepQry.caches(caches);
                 twoStepQry.extraCaches(extraCaches);
+                twoStepQry.local(qry.isLocal());
 
                 meta = meta(stmt.getMetaData());
             }
@@ -1380,6 +1504,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @throws IllegalStateException if segmented indices used with non-segmented indices.
+     */
+    private void checkCacheIndexSegmentation(List<Integer> caches) {
+        if (caches.isEmpty())
+            return; //Nnothing to check
+
+        GridCacheSharedContext sharedContext = ctx.cache().context();
+
+        int expectedParallelism = 0;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = sharedContext.cacheContext(caches.get(i));
+
+            assert cctx != null;
+
+            if(!cctx.isPartitioned())
+                continue;
+
+            if(expectedParallelism == 0)
+                expectedParallelism = cctx.config().getQueryParallelism();
+            else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+                throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
+        }
+    }
+
+    /**
      * Prepares statement for query.
      *
      * @param qry Query string.
@@ -1669,7 +1819,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private void cleanupStatementCache() {
         long cur = U.currentTimeMillis();
 
-        for(Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
+        for (Iterator<Map.Entry<Thread, StatementCache>> it = stmtCache.entrySet().iterator(); it.hasNext(); ) {
             Map.Entry<Thread, StatementCache> entry = it.next();
 
             Thread t = entry.getKey();
@@ -1877,6 +2027,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         for (ClusterNode node : nodes) {
             if (node.isLocal()) {
+                if (locNode != null)
+                    throw new IllegalStateException();
+
                 locNode = node;
 
                 continue;
@@ -2163,23 +2316,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private final boolean enforceJoinOrder;
 
+        /** */
+        private final boolean isLocal;
+
         /**
          * @param space Space.
          * @param sql Sql.
          * @param grpByCollocated Collocated GROUP BY.
          * @param distributedJoins Distributed joins enabled.
          * @param enforceJoinOrder Enforce join order of tables.
+         * @param isLocal Query is local flag.
          */
         private TwoStepCachedQueryKey(String space,
             String sql,
             boolean grpByCollocated,
             boolean distributedJoins,
-            boolean enforceJoinOrder) {
+            boolean enforceJoinOrder,
+            boolean isLocal) {
             this.space = space;
             this.sql = sql;
             this.grpByCollocated = grpByCollocated;
             this.distributedJoins = distributedJoins;
             this.enforceJoinOrder = enforceJoinOrder;
+            this.isLocal = isLocal;
         }
 
         /** {@inheritDoc} */
@@ -2204,7 +2363,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (space != null ? !space.equals(that.space) : that.space != null)
                 return false;
 
-            return sql.equals(that.sql);
+            return isLocal == that.isLocal && sql.equals(that.sql);
         }
 
         /** {@inheritDoc} */
@@ -2212,8 +2371,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             int res = space != null ? space.hashCode() : 0;
             res = 31 * res + sql.hashCode();
             res = 31 * res + (grpByCollocated ? 1 : 0);
-            res = 31 * res + (distributedJoins ? 1 : 0);
-            res = 31 * res + (enforceJoinOrder ? 1 : 0);
+            res = res + (distributedJoins ? 2 : 0);
+            res = res + (enforceJoinOrder ? 4 : 0);
+            res = res + (isLocal ? 8 : 0);
 
             return res;
         }
@@ -2572,7 +2732,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 affCol = null;
 
             // Add primary key index.
-            idxs.add(new GridH2TreeIndex("_key_PK", tbl, true,
+            idxs.add(createTreeIndex("_key_PK", tbl, true,
                 treeIndexColumns(new ArrayList<IndexColumn>(2), keyCol, affCol)));
 
             if (type().valueClass() == String.class) {
@@ -2618,7 +2778,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
                         cols = treeIndexColumns(cols, keyCol, affCol);
 
-                        idxs.add(new GridH2TreeIndex(name, tbl, false, cols));
+                        idxs.add(createTreeIndex(name, tbl, false, cols));
                     }
                     else if (idx.type() == GEO_SPATIAL)
                         idxs.add(createH2SpatialIndex(tbl, name, cols.toArray(new IndexColumn[cols.size()])));
@@ -2629,7 +2789,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             // Add explicit affinity key index if nothing alike was found.
             if (affCol != null && !affIdxFound) {
-                idxs.add(new GridH2TreeIndex("AFFINITY_KEY", tbl, false,
+                idxs.add(createTreeIndex("AFFINITY_KEY", tbl, false,
                     treeIndexColumns(new ArrayList<IndexColumn>(2), affCol, keyCol)));
             }
 
@@ -2676,6 +2836,22 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 throw new IgniteException("Failed to instantiate: " + className, e);
             }
         }
+
+        /**
+         * @param idxName Index name.
+         * @param tbl Table.
+         * @param pk Primary key flag.
+         * @param columns Index column list.
+         * @return
+         */
+        private Index createTreeIndex(String idxName, GridH2Table tbl, boolean pk, List<IndexColumn> columns) {
+            GridCacheContext<?, ?> cctx = tbl.rowDescriptor().context();
+
+            if (cctx != null && cctx.config().getQueryParallelism() > 1)
+                return new GridH2TreeIndex(idxName, tbl, pk, columns, cctx.config().getQueryParallelism());
+
+            return new GridH2TreeIndex(idxName, tbl, pk, columns, 1);
+        }
     }
 
     /**
@@ -2729,6 +2905,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Closeable iterator.
+     */
+    private interface ClIter<X> extends AutoCloseable, Iterator<X> {
+        // No-op.
+    }
+
+    /**
      * Field descriptor.
      */
     static class SqlFieldMetadata implements GridQueryFieldMetadata {

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
new file mode 100644
index 0000000..cc06244
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/DistributedJoinMode.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.opt;
+
+/**
+ * Defines set of distributed join modes.
+ */
+public enum DistributedJoinMode {
+    /**
+     * Distributed joins is disabled. Local joins will be performed instead.
+     */
+    OFF,
+
+    /**
+     * Distributed joins is enabled within local node only.
+     *
+     * NOTE: This mode is used with segmented indices for local sql queries.
+     * As in this case we need to make distributed join across local index segments
+     * and prevent range-queries to other nodes.
+     */
+    LOCAL_ONLY,
+
+    /**
+     * Distributed joins is enabled.
+     */
+    ON;
+
+    /**
+     * @param isLocal Query local flag.
+     * @param distributedJoins Query distributed joins flag.
+     * @return DistributedJoinMode for the query.
+     */
+    public static DistributedJoinMode distributedJoinMode(boolean isLocal, boolean distributedJoins) {
+        return distributedJoins ? (isLocal ? LOCAL_ONLY : ON) : OFF;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index bab219c..131e03b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -81,6 +81,8 @@ import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyIterator;
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.LOCAL_ONLY;
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2CollocationModel.buildCollocationModel;
@@ -178,6 +180,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
+     * @return Index segment ID for current query context.
+     */
+    protected int threadLocalSegment() {
+       return 0;
+    }
+
+    /**
      * If the index supports rebuilding it has to creates its own copy.
      *
      * @return Rebuilt copy.
@@ -252,7 +261,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
         // because on run stage reordering of joined tables by Optimizer is explicitly disabled
         // and thus multiplier will be always the same, so it will not affect choice of index.
         // Query expressions can not be distributed as well.
-        if (qctx == null || qctx.type() != PREPARE || !qctx.distributedJoins() || ses.isPreparingQueryExpression())
+        if (qctx == null || qctx.type() != PREPARE || qctx.distributedJoinMode() == OFF || ses.isPreparingQueryExpression())
             return GridH2CollocationModel.MULTIPLIER_COLLOCATED;
 
         // We have to clear this cache because normally sub-query plan cost does not depend on anything
@@ -363,7 +372,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
     @Override public IndexLookupBatch createLookupBatch(TableFilter filter) {
         GridH2QueryContext qctx = GridH2QueryContext.get();
 
-        if (qctx == null || !qctx.distributedJoins() || !getTable().isPartitioned())
+        if (qctx == null || qctx.distributedJoinMode() == OFF || !getTable().isPartitioned())
             return null;
 
         IndexColumn affCol = getTable().getAffinityKeyColumn();
@@ -381,9 +390,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
             ucast = false;
         }
 
-        GridCacheContext<?,?> cctx = getTable().rowDescriptor().context();
+        GridCacheContext<?, ?> cctx = getTable().rowDescriptor().context();
 
-        return new DistributedLookupBatch(cctx, ucast, affColId);
+        boolean isLocal = qctx.distributedJoinMode() == LOCAL_ONLY;
+
+        return new DistributedLookupBatch(cctx, ucast, affColId, isLocal);
     }
 
     /**
@@ -437,18 +448,18 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param node Requesting node.
      * @param msg Request message.
      */
-    private void onIndexRangeRequest(ClusterNode node, GridH2IndexRangeRequest msg) {
-        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
-            msg.originNodeId(),
-            msg.queryId(),
-            MAP);
-
+    private void onIndexRangeRequest(final ClusterNode node, final GridH2IndexRangeRequest msg) {
         GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
 
         res.originNodeId(msg.originNodeId());
         res.queryId(msg.queryId());
+        res.originSegmentId(msg.originSegmentId());
+        res.segment(msg.segment());
         res.batchLookupId(msg.batchLookupId());
 
+        GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(), msg.originNodeId(),
+            msg.queryId(), msg.originSegmentId(), MAP);
+
         if (qctx == null)
             res.status(STATUS_NOT_FOUND);
         else {
@@ -461,11 +472,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                     assert !msg.bounds().isEmpty() : "empty bounds";
 
-                    src = new RangeSource(msg.bounds(), snapshot0, qctx.filter());
+                    src = new RangeSource(msg.bounds(), msg.segment(), snapshot0, qctx.filter());
                 }
                 else {
                     // This is request to fetch next portion of data.
-                    src = qctx.getSource(node.id(), msg.batchLookupId());
+                    src = qctx.getSource(node.id(), msg.segment(), msg.batchLookupId());
 
                     assert src != null;
                 }
@@ -491,11 +502,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 if (src.hasMoreRows()) {
                     // Save source for future fetches.
                     if (msg.bounds() != null)
-                        qctx.putSource(node.id(), msg.batchLookupId(), src);
+                        qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), src);
                 }
                 else if (msg.bounds() == null) {
                     // Drop saved source.
-                    qctx.putSource(node.id(), msg.batchLookupId(), null);
+                    qctx.putSource(node.id(), msg.segment(), msg.batchLookupId(), null);
                 }
 
                 assert !ranges.isEmpty();
@@ -520,17 +531,17 @@ public abstract class GridH2IndexBase extends BaseIndex {
      */
     private void onIndexRangeResponse(ClusterNode node, GridH2IndexRangeResponse msg) {
         GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
-            msg.originNodeId(), msg.queryId(), MAP);
+            msg.originNodeId(), msg.queryId(), msg.originSegmentId(), MAP);
 
         if (qctx == null)
             return;
 
-        Map<ClusterNode, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
+        Map<SegmentKey, RangeStream> streams = qctx.getStreams(msg.batchLookupId());
 
         if (streams == null)
             return;
 
-        RangeStream stream = streams.get(node);
+        RangeStream stream = streams.get(new SegmentKey(node, msg.segment()));
 
         assert stream != null;
 
@@ -549,47 +560,69 @@ public abstract class GridH2IndexBase extends BaseIndex {
     /**
      * @param qctx Query context.
      * @param batchLookupId Batch lookup ID.
+     * @param segmentId Segment ID.
      * @return Index range request.
      */
-    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId) {
+    private static GridH2IndexRangeRequest createRequest(GridH2QueryContext qctx, int batchLookupId, int segmentId) {
         GridH2IndexRangeRequest req = new GridH2IndexRangeRequest();
 
         req.originNodeId(qctx.originNodeId());
         req.queryId(qctx.queryId());
+        req.originSegmentId(qctx.segment());
+        req.segment(segmentId);
         req.batchLookupId(batchLookupId);
 
         return req;
     }
 
+
     /**
      * @param qctx Query context.
      * @param cctx Cache context.
+     * @param isLocalQry Local query flag.
      * @return Collection of nodes for broadcasting.
      */
-    private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
+    private List<SegmentKey> broadcastSegments(GridH2QueryContext qctx, GridCacheContext<?, ?> cctx, boolean isLocalQry) {
         Map<UUID, int[]> partMap = qctx.partitionsMap();
 
-        List<ClusterNode> res;
+        List<ClusterNode> nodes;
+
+        if (isLocalQry) {
+            if (partMap != null && !partMap.containsKey(cctx.localNodeId()))
+                return Collections.<SegmentKey>emptyList(); // Prevent remote index call for local queries.
 
-        if (partMap == null)
-            res = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+            nodes = Collections.singletonList(cctx.localNode());
+        }
         else {
-            res = new ArrayList<>(partMap.size());
+            if (partMap == null)
+                nodes = new ArrayList<>(CU.affinityNodes(cctx, qctx.topologyVersion()));
+            else {
+                nodes = new ArrayList<>(partMap.size());
 
-            GridKernalContext ctx = kernalContext();
+                GridKernalContext ctx = kernalContext();
 
-            for (UUID nodeId : partMap.keySet()) {
-                ClusterNode node = ctx.discovery().node(nodeId);
+                for (UUID nodeId : partMap.keySet()) {
+                    ClusterNode node = ctx.discovery().node(nodeId);
 
-                if (node == null)
-                    throw new GridH2RetryException("Failed to find node.");
+                    if (node == null)
+                        throw new GridH2RetryException("Failed to find node.");
 
-                res.add(node);
+                    nodes.add(node);
+                }
             }
+
+            if (F.isEmpty(nodes))
+                throw new GridH2RetryException("Failed to collect affinity nodes.");
         }
 
-        if (F.isEmpty(res))
-            throw new GridH2RetryException("Failed to collect affinity nodes.");
+        int segmentsCount = segmentsCount();
+
+        List<SegmentKey> res = new ArrayList<>(nodes.size() * segmentsCount);
+
+        for (ClusterNode node : nodes) {
+            for (int seg = 0; seg < segmentsCount; seg++)
+                res.add(new SegmentKey(node, seg));
+        }
 
         return res;
     }
@@ -598,26 +631,81 @@ public abstract class GridH2IndexBase extends BaseIndex {
      * @param cctx Cache context.
      * @param qctx Query context.
      * @param affKeyObj Affinity key.
-     * @return Cluster nodes or {@code null} if affinity key is a null value.
+     * @param isLocalQry Local query flag.
+     * @return Segment key for Affinity key.
      */
-    private ClusterNode rangeNode(GridCacheContext<?,?> cctx, GridH2QueryContext qctx, Object affKeyObj) {
+    private SegmentKey rangeSegment(GridCacheContext<?, ?> cctx, GridH2QueryContext qctx, Object affKeyObj, boolean isLocalQry) {
         assert affKeyObj != null && affKeyObj != EXPLICIT_NULL : affKeyObj;
 
         ClusterNode node;
 
-        if (qctx.partitionsMap() != null) {
-            // If we have explicit partitions map, we have to use it to calculate affinity node.
-            UUID nodeId = qctx.nodeForPartition(cctx.affinity().partition(affKeyObj), cctx);
+        int partition = cctx.affinity().partition(affKeyObj);
+
+        if (isLocalQry) {
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
+
+                if(!cctx.localNodeId().equals(nodeId))
+                    return null; // Prevent remote index call for local queries.
+            }
+
+            if (!cctx.affinity().primaryByKey(cctx.localNode(), partition, qctx.topologyVersion()))
+                return null;
+
+            node = cctx.localNode();
+        }
+        else{
+            if (qctx.partitionsMap() != null) {
+                // If we have explicit partitions map, we have to use it to calculate affinity node.
+                UUID nodeId = qctx.nodeForPartition(partition, cctx);
 
             node = cctx.discovery().node(nodeId);
         }
         else // Get primary node for current topology version.
             node = cctx.affinity().primaryByKey(affKeyObj, qctx.topologyVersion());
 
-        if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
-            throw new GridH2RetryException("Failed to find node.");
+            if (node == null) // Node was not found, probably topology changed and we need to retry the whole query.
+                throw new GridH2RetryException("Failed to find node.");
+        }
+
+        return new SegmentKey(node, segment(partition));
+    }
+
+    /** */
+    protected class SegmentKey {
+        /** */
+        final ClusterNode node;
+
+        /** */
+        final int segmentId;
+
+        SegmentKey(ClusterNode node, int segmentId) {
+            assert node != null;
+
+            this.node = node;
+            this.segmentId = segmentId;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            SegmentKey key = (SegmentKey)o;
+
+            return segmentId == key.segmentId && node.id().equals(key.node.id());
 
-        return node;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = node.hashCode();
+            result = 31 * result + segmentId;
+            return result;
+        }
     }
 
     /**
@@ -740,6 +828,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
         return database.createRow(vals0, MEMORY_CALCULATE);
     }
 
+    /** @return Index segments count. */
+    protected int segmentsCount() {
+        return 1;
+    }
+
+    /**
+     * @param partition Partition idx.
+     * @return Segment ID for given key
+     */
+    protected int segment(int partition) {
+        return 0;
+    }
+
+
     /**
      * Simple cursor from a single node.
      */
@@ -752,14 +854,14 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         /**
          * @param rangeId Range ID.
-         * @param nodes Remote nodes.
+         * @param keys Remote index segment keys.
          * @param rangeStreams Range streams.
          */
-        private UnicastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
-            assert nodes.size() == 1;
+        UnicastCursor(int rangeId, List<SegmentKey> keys, Map<SegmentKey, RangeStream> rangeStreams) {
+            assert keys.size() == 1;
 
             this.rangeId = rangeId;
-            this.stream = rangeStreams.get(F.first(nodes));
+            this.stream = rangeStreams.get(F.first(keys));
 
             assert stream != null;
         }
@@ -803,20 +905,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
         /**
          * @param rangeId Range ID.
-         * @param nodes Remote nodes.
+         * @param segmentKeys Remote nodes.
          * @param rangeStreams Range streams.
          */
-        private BroadcastCursor(int rangeId, Collection<ClusterNode> nodes, Map<ClusterNode,RangeStream> rangeStreams) {
-            assert nodes.size() > 1;
+        BroadcastCursor(int rangeId, Collection<SegmentKey> segmentKeys, Map<SegmentKey, RangeStream> rangeStreams) {
 
             this.rangeId = rangeId;
 
-            streams = new RangeStream[nodes.size()];
+            streams = new RangeStream[segmentKeys.size()];
 
             int i = 0;
 
-            for (ClusterNode node : nodes) {
-                RangeStream stream = rangeStreams.get(node);
+            for (SegmentKey segmentKey : segmentKeys) {
+                RangeStream stream = rangeStreams.get(segmentKey);
 
                 assert stream != null;
 
@@ -928,16 +1029,19 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final int affColId;
 
         /** */
+        private final boolean localQuery;
+
+        /** */
         GridH2QueryContext qctx;
 
         /** */
         int batchLookupId;
 
         /** */
-        Map<ClusterNode, RangeStream> rangeStreams = Collections.emptyMap();
+        Map<SegmentKey, RangeStream> rangeStreams = Collections.emptyMap();
 
         /** */
-        List<ClusterNode> broadcastNodes;
+        List<SegmentKey> broadcastSegments;
 
         /** */
         List<Future<Cursor>> res = Collections.emptyList();
@@ -952,11 +1056,13 @@ public abstract class GridH2IndexBase extends BaseIndex {
          * @param cctx Cache Cache context.
          * @param ucast Unicast or broadcast query.
          * @param affColId Affinity column ID.
+         * @param localQuery Local query flag.
          */
-        private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
+        DistributedLookupBatch(GridCacheContext<?, ?> cctx, boolean ucast, int affColId, boolean localQuery) {
             this.cctx = cctx;
             this.ucast = ucast;
             this.affColId = affColId;
+            this.localQuery = localQuery;
         }
 
         /**
@@ -1028,7 +1134,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
             Object affKey = affColId == -1 ? null : getAffinityKey(firstRow, lastRow);
 
-            List<ClusterNode> nodes;
+            List<SegmentKey> segmentKeys;
             Future<Cursor> fut;
 
             if (affKey != null) {
@@ -1036,17 +1142,20 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 if (affKey == EXPLICIT_NULL) // Affinity key is explicit null, we will not find anything.
                     return false;
 
-                nodes = F.asList(rangeNode(cctx, qctx, affKey));
+                segmentKeys = F.asList(rangeSegment(cctx, qctx, affKey, localQuery));
             }
             else {
                 // Affinity key is not provided or is not the same in upper and lower bounds, we have to broadcast.
-                if (broadcastNodes == null)
-                    broadcastNodes = broadcastNodes(qctx, cctx);
+                if (broadcastSegments == null)
+                    broadcastSegments = broadcastSegments(qctx, cctx, localQuery);
 
-                nodes = broadcastNodes;
+                segmentKeys = broadcastSegments;
             }
 
-            assert !F.isEmpty(nodes) : nodes;
+            if (localQuery && segmentKeys.isEmpty())
+                return false; // Nothing to do
+
+            assert !F.isEmpty(segmentKeys) : segmentKeys;
 
             final int rangeId = res.size();
 
@@ -1058,21 +1167,21 @@ public abstract class GridH2IndexBase extends BaseIndex {
             GridH2RowRangeBounds rangeBounds = rangeBounds(rangeId, first, last);
 
             // Add range to every message of every participating node.
-            for (int i = 0; i < nodes.size(); i++) {
-                ClusterNode node = nodes.get(i);
-                assert node != null;
+            for (int i = 0; i < segmentKeys.size(); i++) {
+                SegmentKey segmentKey = segmentKeys.get(i);
+                assert segmentKey != null;
 
-                RangeStream stream = rangeStreams.get(node);
+                RangeStream stream = rangeStreams.get(segmentKey);
 
                 List<GridH2RowRangeBounds> bounds;
 
                 if (stream == null) {
-                    stream = new RangeStream(qctx, node);
+                    stream = new RangeStream(qctx, segmentKey.node);
 
-                    stream.req = createRequest(qctx, batchLookupId);
+                    stream.req = createRequest(qctx, batchLookupId, segmentKey.segmentId);
                     stream.req.bounds(bounds = new ArrayList<>());
 
-                    rangeStreams.put(node, stream);
+                    rangeStreams.put(segmentKey, stream);
                 }
                 else
                     bounds = stream.req.bounds();
@@ -1084,9 +1193,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
                     batchFull = true;
             }
 
-            fut = new DoneFuture<>(nodes.size() == 1 ?
-                new UnicastCursor(rangeId, nodes, rangeStreams) :
-                new BroadcastCursor(rangeId, nodes, rangeStreams));
+            fut = new DoneFuture<>(segmentKeys.size() == 1 ?
+                new UnicastCursor(rangeId, segmentKeys, rangeStreams) :
+                new BroadcastCursor(rangeId, segmentKeys, rangeStreams));
 
             res.add(fut);
 
@@ -1138,7 +1247,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
             batchLookupId = 0;
 
             rangeStreams = Collections.emptyMap();
-            broadcastNodes = null;
+            broadcastSegments = null;
             batchFull = false;
             findCalled = false;
             res = Collections.emptyList();
@@ -1244,7 +1353,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
                             if (remainingRanges > 0) {
                                 if (req.bounds() != null)
-                                    req = createRequest(qctx, req.batchLookupId());
+                                    req = createRequest(qctx, req.batchLookupId(), req.segment());
 
                                 // Prefetch next page.
                                 send(singletonList(node), req);
@@ -1366,6 +1475,9 @@ public abstract class GridH2IndexBase extends BaseIndex {
         final ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree;
 
         /** */
+        private final int segment;
+
+        /** */
         final IndexingQueryFilter filter;
 
         /**
@@ -1375,9 +1487,11 @@ public abstract class GridH2IndexBase extends BaseIndex {
          */
         RangeSource(
             Iterable<GridH2RowRangeBounds> bounds,
+            int segment,
             ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> tree,
             IndexingQueryFilter filter
         ) {
+            this.segment = segment;
             this.filter = filter;
             this.tree = tree;
             boundsIter = bounds.iterator();
@@ -1435,7 +1549,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 SearchRow first = toSearchRow(bounds.first());
                 SearchRow last = toSearchRow(bounds.last());
 
-                ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> t = tree != null ? tree : treeForRead();
+                ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = tree != null ? tree : treeForRead(segment);
 
                 curRange = doFind0(t, first, true, last, filter);
 
@@ -1452,9 +1566,10 @@ public abstract class GridH2IndexBase extends BaseIndex {
     }
 
     /**
-     * @return Snapshot for current thread if there is one.
+     * @param segment Segment Id.
+     * @return Snapshot for requested segment if there is one.
      */
-    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead() {
+    protected ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> treeForRead(int segment) {
         throw new UnsupportedOperationException();
     }
 
@@ -1505,7 +1620,8 @@ public abstract class GridH2IndexBase extends BaseIndex {
                 this.fltr = qryFilter.forSpace(spaceName);
 
                 this.isValRequired = qryFilter.isValueRequired();
-            } else {
+            }
+            else {
                 this.fltr = null;
 
                 this.isValRequired = false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64ba13b0/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 19ea2b2..a7ee0dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -32,6 +32,7 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -79,7 +80,7 @@ public class GridH2QueryContext {
     private UUID[] partsNodes;
 
     /** */
-    private boolean distributedJoins;
+    private DistributedJoinMode distributedJoinMode;
 
     /** */
     private int pageSize;
@@ -94,7 +95,22 @@ public class GridH2QueryContext {
      * @param type Query type.
      */
     public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        key = new Key(locNodeId, nodeId, qryId, type);
+        assert type != MAP;
+
+        key = new Key(locNodeId, nodeId, qryId, 0, type);
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     * @param nodeId The node who initiated the query.
+     * @param qryId The query ID.
+     * @param segmentId Index segment ID.
+     * @param type Query type.
+     */
+    public GridH2QueryContext(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
+        assert segmentId == 0 || type == MAP;
+
+        key = new Key(locNodeId, nodeId, qryId, segmentId, type);
     }
 
     /**
@@ -133,20 +149,20 @@ public class GridH2QueryContext {
     }
 
     /**
-     * @param distributedJoins Distributed joins can be run in this query.
+     * @param distributedJoinMode Distributed join mode.
      * @return {@code this}.
      */
-    public GridH2QueryContext distributedJoins(boolean distributedJoins) {
-        this.distributedJoins = distributedJoins;
+    public GridH2QueryContext distributedJoinMode(DistributedJoinMode distributedJoinMode) {
+        this.distributedJoinMode = distributedJoinMode;
 
         return this;
     }
 
     /**
-     * @return {@code true} If distributed joins can be run in this query.
+     * @return Distributed join mode.
      */
-    public boolean distributedJoins() {
-        return distributedJoins;
+    public DistributedJoinMode distributedJoinMode() {
+        return distributedJoinMode;
     }
 
     /**
@@ -226,6 +242,11 @@ public class GridH2QueryContext {
         return nodeIds[p];
     }
 
+    /** @return index segment ID. */
+    public int segment() {
+        return key.segmentId;
+    }
+
     /**
      * @param idxId Index ID.
      * @param snapshot Index snapshot.
@@ -303,11 +324,12 @@ public class GridH2QueryContext {
 
     /**
      * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
      * @param batchLookupId Batch lookup ID.
      * @param src Range source.
      */
-    public synchronized void putSource(UUID ownerId, int batchLookupId, Object src) {
-        SourceKey srcKey = new SourceKey(ownerId, batchLookupId);
+    public synchronized void putSource(UUID ownerId, int segmentId, int batchLookupId, Object src) {
+        SourceKey srcKey = new SourceKey(ownerId, segmentId, batchLookupId);
 
         if (src != null) {
             if (sources == null)
@@ -321,15 +343,16 @@ public class GridH2QueryContext {
 
     /**
      * @param ownerId Owner node ID.
+     * @param segmentId Index segment ID.
      * @param batchLookupId Batch lookup ID.
      * @return Range source.
      */
     @SuppressWarnings("unchecked")
-    public synchronized <T> T getSource(UUID ownerId, int batchLookupId) {
+    public synchronized <T> T getSource(UUID ownerId, int segmentId, int batchLookupId) {
         if (sources == null)
             return null;
 
-        return (T)sources.get(new SourceKey(ownerId, batchLookupId));
+        return (T)sources.get(new SourceKey(ownerId, segmentId, batchLookupId));
     }
 
     /**
@@ -356,7 +379,7 @@ public class GridH2QueryContext {
          assert qctx.get() == null;
 
          // We need MAP query context to be available to other threads to run distributed joins.
-         if (x.key.type == MAP && x.distributedJoins() && qctxs.putIfAbsent(x.key, x) != null)
+         if (x.key.type == MAP && x.distributedJoinMode() != OFF && qctxs.putIfAbsent(x.key, x) != null)
              throw new IllegalStateException("Query context is already set.");
 
          qctx.set(x);
@@ -381,7 +404,14 @@ public class GridH2QueryContext {
      * @return {@code True} if context was found.
      */
     public static boolean clear(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
-        return doClear(new Key(locNodeId, nodeId, qryId, type), false);
+        boolean res = false;
+
+        for (Key key : qctxs.keySet()) {
+            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId) && key.qryId == qryId && key.type == type)
+                res |= doClear(new Key(locNodeId, nodeId, qryId, key.segmentId, type), false);
+        }
+
+        return res;
     }
 
     /**
@@ -463,6 +493,7 @@ public class GridH2QueryContext {
      * @param locNodeId Local node ID.
      * @param nodeId The node who initiated the query.
      * @param qryId The query ID.
+     * @param segmentId Index segment ID.
      * @param type Query type.
      * @return Query context.
      */
@@ -470,9 +501,10 @@ public class GridH2QueryContext {
         UUID locNodeId,
         UUID nodeId,
         long qryId,
+        int segmentId,
         GridH2QueryType type
     ) {
-        return qctxs.get(new Key(locNodeId, nodeId, qryId, type));
+        return qctxs.get(new Key(locNodeId, nodeId, qryId, segmentId, type));
     }
 
     /**
@@ -528,15 +560,19 @@ public class GridH2QueryContext {
         private final long qryId;
 
         /** */
+        private final int segmentId;
+
+        /** */
         private final GridH2QueryType type;
 
         /**
          * @param locNodeId Local node ID.
          * @param nodeId The node who initiated the query.
          * @param qryId The query ID.
+         * @param segmentId Index segment ID.
          * @param type Query type.
          */
-        private Key(UUID locNodeId, UUID nodeId, long qryId, GridH2QueryType type) {
+        private Key(UUID locNodeId, UUID nodeId, long qryId, int segmentId, GridH2QueryType type) {
             assert locNodeId != null;
             assert nodeId != null;
             assert type != null;
@@ -544,6 +580,7 @@ public class GridH2QueryContext {
             this.locNodeId = locNodeId;
             this.nodeId = nodeId;
             this.qryId = qryId;
+            this.segmentId = segmentId;
             this.type = type;
         }
 
@@ -568,6 +605,7 @@ public class GridH2QueryContext {
             res = 31 * res + nodeId.hashCode();
             res = 31 * res + (int)(qryId ^ (qryId >>> 32));
             res = 31 * res + type.hashCode();
+            res = 31 * res + segmentId;
 
             return res;
         }
@@ -586,14 +624,19 @@ public class GridH2QueryContext {
         UUID ownerId;
 
         /** */
+        int segmentId;
+
+        /** */
         int batchLookupId;
 
         /**
          * @param ownerId Owner node ID.
+         * @param segmentId Index segment ID.
          * @param batchLookupId Batch lookup ID.
          */
-        SourceKey(UUID ownerId, int batchLookupId) {
+        SourceKey(UUID ownerId, int segmentId, int batchLookupId) {
             this.ownerId = ownerId;
+            this.segmentId = segmentId;
             this.batchLookupId = batchLookupId;
         }
 
@@ -601,12 +644,15 @@ public class GridH2QueryContext {
         @Override public boolean equals(Object o) {
             SourceKey srcKey = (SourceKey)o;
 
-            return batchLookupId == srcKey.batchLookupId && ownerId.equals(srcKey.ownerId);
+            return batchLookupId == srcKey.batchLookupId && segmentId == srcKey.segmentId &&
+                ownerId.equals(srcKey.ownerId);
         }
 
         /** {@inheritDoc} */
         @Override public int hashCode() {
-            return 31 * ownerId.hashCode() + batchLookupId;
+            int hash = ownerId.hashCode();
+            hash = 31 * hash + segmentId;
+            return 31 * hash + batchLookupId;
         }
     }
 }