You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/23 12:09:49 UTC

[1/8] ignite git commit: IGNITE-5054: SQL: Simplified query descriptor, partially removed dependencies on 1-to-1 cache-schema dependency. This closes #1962.

Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-pds aaf9f45b2 -> b86104be7


IGNITE-5054: SQL: Simplified query descriptor, partially removed dependencies on 1-to-1 cache-schema dependency. This closes #1962.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f74d51cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f74d51cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f74d51cb

Branch: refs/heads/ignite-5075-pds
Commit: f74d51cbf9a62858718c5d04b0857a3b0ef32c65
Parents: 992c976
Author: devozerov <vo...@gridgain.com>
Authored: Mon May 22 11:43:14 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon May 22 11:43:14 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   2 +
 .../cache/query/GridCacheTwoStepQuery.java      |  84 ++-----
 .../processors/cache/query/QueryTable.java      | 164 +++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  84 +++----
 .../query/h2/opt/GridH2IndexBase.java           |   2 +-
 .../processors/query/h2/opt/GridH2Table.java    |  28 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  21 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  15 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 233 +++++++++----------
 .../h2/twostep/msg/GridH2QueryRequest.java      |  13 +-
 .../twostep/msg/GridH2ValueMessageFactory.java  |   4 +
 11 files changed, 385 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 17e4a01..753d8af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -178,6 +178,8 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            // -54 is reserved for SQL.
+
             case -53:
                 msg = new SchemaOperationStatusMessage();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 0e31dc0..9e9a875 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.cache.query;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -49,13 +48,7 @@ public class GridCacheTwoStepQuery {
     private String originalSql;
 
     /** */
-    private Collection<String> spaces;
-
-    /** */
-    private Set<String> schemas;
-
-    /** */
-    private Set<String> tbls;
+    private Set<QueryTable> tbls;
 
     /** */
     private boolean distributedJoins;
@@ -64,22 +57,17 @@ public class GridCacheTwoStepQuery {
     private boolean skipMergeTbl;
 
     /** */
-    private List<Integer> caches;
-
-    /** */
-    private List<Integer> extraCaches;
+    private List<Integer> cacheIds;
 
     /** */
     private boolean local;
 
     /**
      * @param originalSql Original query SQL.
-     * @param schemas Schema names in query.
      * @param tbls Tables in query.
      */
-    public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+    public GridCacheTwoStepQuery(String originalSql, Set<QueryTable> tbls) {
         this.originalSql = originalSql;
-        this.schemas = schemas;
         this.tbls = tbls;
     }
 
@@ -157,8 +145,8 @@ public class GridCacheTwoStepQuery {
     public boolean isReplicatedOnly() {
         assert !mapQrys.isEmpty();
 
-        for (int i = 0; i < mapQrys.size(); i++) {
-            if (mapQrys.get(i).isPartitioned())
+        for (GridCacheSqlQuery mapQry : mapQrys) {
+            if (mapQry.isPartitioned())
                 return false;
         }
 
@@ -187,31 +175,17 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @return Caches.
+     * @return Cache IDs.
      */
-    public List<Integer> caches() {
-        return caches;
+    public List<Integer> cacheIds() {
+        return cacheIds;
     }
 
     /**
-     * @param caches Caches.
+     * @param cacheIds Cache IDs.
      */
-    public void caches(List<Integer> caches) {
-        this.caches = caches;
-    }
-
-    /**
-     * @return Caches.
-     */
-    public List<Integer> extraCaches() {
-        return extraCaches;
-    }
-
-    /**
-     * @param extraCaches Caches.
-     */
-    public void extraCaches(List<Integer> extraCaches) {
-        this.extraCaches = extraCaches;
+    public void cacheIds(List<Integer> cacheIds) {
+        this.cacheIds = cacheIds;
     }
 
     /**
@@ -222,27 +196,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @return Spaces.
-     */
-    public Collection<String> spaces() {
-        return spaces;
-    }
-
-    /**
-     * @param spaces Spaces.
-     */
-    public void spaces(Collection<String> spaces) {
-        this.spaces = spaces;
-    }
-
-    /**
-     * @return Schemas.
-     */
-    public Set<String> schemas() {
-        return schemas;
-    }
-
-    /**
      * @return {@code True} If query is local.
      */
     public boolean isLocal() {
@@ -262,11 +215,9 @@ public class GridCacheTwoStepQuery {
     public GridCacheTwoStepQuery copy() {
         assert !explain;
 
-        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
+        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, tbls);
 
-        cp.caches = caches;
-        cp.extraCaches = extraCaches;
-        cp.spaces = spaces;
+        cp.cacheIds = cacheIds;
         cp.rdc = rdc.copy();
         cp.skipMergeTbl = skipMergeTbl;
         cp.pageSize = pageSize;
@@ -279,9 +230,16 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
+     * @return Nuumber of tables.
+     */
+    public int tablesCount() {
+        return tbls.size();
+    }
+
+    /**
      * @return Tables.
      */
-    public Set<String> tables() {
+    public Set<QueryTable> tables() {
         return tbls;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
new file mode 100644
index 0000000..54f5f03
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Query table descriptor.
+ */
+public class QueryTable implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Schema. */
+    private String schema;
+
+    /** Table. */
+    private String tbl;
+
+    /**
+     * Defalt constructor.
+     */
+    public QueryTable() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param schema Schema.
+     * @param tbl Table.
+     */
+    public QueryTable(String schema, String tbl) {
+        this.schema = schema;
+        this.tbl = tbl;
+    }
+
+    /**
+     * @return Schema.
+     */
+    public String schema() {
+        return schema;
+    }
+
+    /**
+     * @return Table.
+     */
+    public String table() {
+        return tbl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("schema", schema))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeString("tbl", tbl))
+                    return false;
+
+                writer.incrementState();
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                schema = reader.readString("schema");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                tbl = reader.readString("tbl");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+        }
+
+        return reader.afterMessageRead(QueryTable.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return -54;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return 31 * (schema != null ? schema.hashCode() : 0) + (tbl != null ? tbl.hashCode() : 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        if (obj instanceof QueryTable) {
+            QueryTable other = (QueryTable)obj;
+
+            return F.eq(tbl, other.tbl) && F.eq(schema, other.schema);
+        }
+
+        return super.equals(obj);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(QueryTable.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 599baa1..0874ddc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -289,7 +291,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** */
     private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
 
-    /**
+    /*
      * Command in H2 prepared statement.
      */
     static {
@@ -397,7 +399,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private DdlStatementsProcessor ddlProc;
 
     /** */
-    private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>();
+    private final ConcurrentMap<QueryTable, GridH2Table> dataTables = new ConcurrentHashMap8<>();
 
     /** Statement cache. */
     private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>();
@@ -1672,44 +1674,33 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     }
                 }
 
-                List<Integer> caches;
-                List<Integer> extraCaches = null;
+                LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
 
                 // Setup spaces from schemas.
-                if (!twoStepQry.schemas().isEmpty()) {
-                    Collection<String> spaces = new ArrayList<>(twoStepQry.schemas().size());
-                    caches = new ArrayList<>(twoStepQry.schemas().size() + 1);
-                    caches.add(cctx.cacheId());
+                assert twoStepQry != null;
 
-                    for (String schema : twoStepQry.schemas()) {
-                        String space0 = space(schema);
+                int tblCnt = twoStepQry.tablesCount();
 
-                        spaces.add(space0);
+                if (tblCnt > 0) {
+                    caches0.add(cctx.cacheId());
 
-                        if (!F.eq(space0, space)) {
-                            int cacheId = CU.cacheId(space0);
+                    for (QueryTable table : twoStepQry.tables()) {
+                        String cacheName = cacheNameForSchemaAndTable(table.schema(), table.table());
 
-                            caches.add(cacheId);
+                        int cacheId = CU.cacheId(cacheName);
 
-                            if (extraCaches == null)
-                                extraCaches = new ArrayList<>();
-
-                            extraCaches.add(cacheId);
-                        }
+                        caches0.add(cacheId);
                     }
-
-                    twoStepQry.spaces(spaces);
-                }
-                else {
-                    caches = Collections.singletonList(cctx.cacheId());
-                    extraCaches = null;
                 }
+                else
+                    caches0.add(cctx.cacheId());
 
                 //Prohibit usage indices with different numbers of segments in same query.
-                checkCacheIndexSegmentation(caches);
+                List<Integer> cacheIds = new ArrayList<>(caches0);
 
-                twoStepQry.caches(caches);
-                twoStepQry.extraCaches(extraCaches);
+                checkCacheIndexSegmentation(cacheIds);
+
+                twoStepQry.cacheIds(cacheIds);
                 twoStepQry.local(qry.isLocal());
 
                 meta = meta(stmt.getMetaData());
@@ -1750,6 +1741,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Get cache for schema and table.
+     *
+     * @param schemaName Schema name.
+     * @param tblName Table name.
+     * @return Cache name.
+     */
+    private String cacheNameForSchemaAndTable(String schemaName, String tblName) {
+        // TODO: This need to be changed.
+        return space(schemaName);
+    }
+
+    /**
      * @throws IllegalStateException if segmented indices used with non-segmented indices.
      */
     private void checkCacheIndexSegmentation(List<Integer> caches) {
@@ -2007,15 +2010,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             addInitialUserIndex(spaceName, tbl, usrIdx);
 
         if (dataTables.putIfAbsent(h2Tbl.identifier(), h2Tbl) != null)
-            throw new IllegalStateException("Table already exists: " + h2Tbl.identifier());
-    }
-
-    /**
-     * @param identifier Table identifier.
-     * @return Data table.
-     */
-    public GridH2Table dataTable(String identifier) {
-        return dataTables.get(identifier);
+            throw new IllegalStateException("Table already exists: " + h2Tbl.identifierString());
     }
 
     /**
@@ -2026,12 +2021,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Table or {@code null} if none found.
      */
     public GridH2Table dataTable(String schemaName, String tblName) {
-        for (GridH2Table tbl : dataTables.values()) {
-            if (tbl.getSchema().getName().equals(schemaName) && tbl.getName().equals(tblName))
-                return tbl;
-        }
+        return dataTable(new QueryTable(schemaName, tblName));
+    }
 
-        return null;
+    /**
+     * Find table by it's identifier.
+     *
+     * @param tbl Identifier.
+     * @return Table or {@code null} if none found.
+     */
+    public GridH2Table dataTable(QueryTable tbl) {
+        return dataTables.get(tbl);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 623da09..12850f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -132,7 +132,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
 
             log = ctx.log(getClass());
 
-            msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName());
+            msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
 
             msgLsnr = new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index a00ea90..37c03e3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
 import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -47,7 +48,6 @@ import org.h2.message.DbException;
 import org.h2.result.Row;
 import org.h2.result.SearchRow;
 import org.h2.result.SortOrder;
-import org.h2.table.Column;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableBase;
 import org.h2.table.TableType;
@@ -58,7 +58,6 @@ import org.jsr166.LongAdder8;
 
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -104,6 +103,12 @@ public class GridH2Table extends TableBase {
     /** */
     private volatile boolean rebuildFromHashInProgress;
 
+    /** Identifier. */
+    private final QueryTable identifier;
+
+    /** Identifier as string. */
+    private final String identifierStr;
+
     /**
      * Creates table.
      *
@@ -149,6 +154,10 @@ public class GridH2Table extends TableBase {
 
         this.rowFactory = rowFactory;
 
+        identifier = new QueryTable(getSchema().getName(), getName());
+
+        identifierStr = identifier.schema() + "." + identifier.table();
+
         // Indexes must be created in the end when everything is ready.
         idxs = idxsFactory.createSystemIndexes(this);
 
@@ -221,7 +230,7 @@ public class GridH2Table extends TableBase {
         if (destroyed) {
             unlock(exclusive);
 
-            throw new IllegalStateException("Table " + identifier() + " already destroyed.");
+            throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
         }
 
         if (snapshotInLock())
@@ -293,8 +302,15 @@ public class GridH2Table extends TableBase {
     /**
      * @return Table identifier.
      */
-    public String identifier() {
-        return getSchema().getName() + '.' + getName();
+    public QueryTable identifier() {
+        return identifier;
+    }
+
+    /**
+     * @return Table identifier as string.
+     */
+    public String identifierString() {
+        return identifierStr;
     }
 
     /**
@@ -352,7 +368,7 @@ public class GridH2Table extends TableBase {
      */
     private void ensureNotDestroyed() {
         if (destroyed)
-            throw new IllegalStateException("Table " + identifier() + " already destroyed.");
+            throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 26c6b08..9f01346 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -93,11 +94,8 @@ public class GridSqlQuerySplitter {
     /** */
     private int splitId = -1; // The first one will be 0.
 
-    /** */
-    private Set<String> schemas = new HashSet<>();
-
-    /** */
-    private Set<String> tbls = new HashSet<>();
+    /** Query tables. */
+    private Set<QueryTable> tbls = new HashSet<>();
 
     /** */
     private boolean rdcQrySimple;
@@ -224,7 +222,7 @@ public class GridSqlQuerySplitter {
         }
 
         // Setup resulting two step query and return it.
-        GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.schemas, splitter.tbls);
+        GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.tbls);
 
         twoStepQry.reduceQuery(splitter.rdcSqlQry);
 
@@ -1500,15 +1498,10 @@ public class GridSqlQuerySplitter {
         if (from instanceof GridSqlTable) {
             GridSqlTable tbl = (GridSqlTable)from;
 
-            String schema = tbl.schema();
-
-            boolean addSchema = tbls == null;
-
-            if (tbls != null)
-                addSchema = tbls.add(tbl.dataTable().identifier());
+            String schemaName = tbl.dataTable().identifier().schema();
+            String tblName = tbl.dataTable().identifier().table();
 
-            if (addSchema && schema != null && schemas != null)
-                schemas.add(schema);
+            tbls.add(new QueryTable(schemaName, tblName));
 
             // In case of alias parent we need to replace the alias itself.
             if (!prntAlias)

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 1444209..43cc230 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -101,7 +102,7 @@ public class GridMapQueryExecutor {
     /** */
     private static final Field RESULT_FIELD;
 
-    /**
+    /*
      * Initialize.
      */
     static {
@@ -514,7 +515,7 @@ public class GridMapQueryExecutor {
         AffinityTopologyVersion topVer,
         Map<UUID, int[]> partsMap,
         int[] parts,
-        Collection<String> tbls,
+        Collection<QueryTable> tbls,
         int pageSize,
         DistributedJoinMode distributedJoinMode,
         boolean enforceJoinOrder,
@@ -567,14 +568,14 @@ public class GridMapQueryExecutor {
             if (!F.isEmpty(tbls)) {
                 snapshotedTbls = new ArrayList<>(tbls.size());
 
-                for (String identifier : tbls) {
-                    GridH2Table tbl = h2.dataTable(identifier);
+                for (QueryTable tbl : tbls) {
+                    GridH2Table h2Tbl = h2.dataTable(tbl);
 
-                    Objects.requireNonNull(tbl, identifier);
+                    Objects.requireNonNull(h2Tbl, tbl.toString());
 
-                    tbl.snapshotIndexes(qctx);
+                    h2Tbl.snapshotIndexes(qctx);
 
-                    snapshotedTbls.add(tbl);
+                    snapshotedTbls.add(h2Tbl);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3d81cb5..75914ef 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.h2.command.ddl.CreateTableData;
 import org.h2.engine.Session;
@@ -348,19 +347,13 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cctx Cache context for main space.
-     * @param extraSpaces Extra spaces.
+     * @param cacheIds Cache IDs.
      * @return {@code true} If preloading is active.
      */
-    private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) {
-        if (hasMovingPartitions(cctx))
-            return true;
-
-        if (extraSpaces != null) {
-            for (int i = 0; i < extraSpaces.size(); i++) {
-                if (hasMovingPartitions(cacheContext(extraSpaces.get(i))))
-                    return true;
-            }
+    private boolean isPreloadingActive(List<Integer> cacheIds) {
+        for (Integer cacheId : cacheIds) {
+            if (hasMovingPartitions(cacheContext(cacheId)))
+                return true;
         }
 
         return false;
@@ -439,17 +432,14 @@ public class GridReduceQueryExecutor {
     /**
      * @param isReplicatedOnly If we must only have replicated caches.
      * @param topVer Topology version.
-     * @param cctx Cache context for main space.
-     * @param extraSpaces Extra spaces.
+     * @param cacheIds Participating cache IDs.
      * @param parts Partitions.
      * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
-    private Map<ClusterNode, IntArray> stableDataNodes(
-            boolean isReplicatedOnly,
-            AffinityTopologyVersion topVer,
-            final GridCacheContext<?, ?> cctx,
-            List<Integer> extraSpaces,
-            int[] parts) {
+    private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer,
+        List<Integer> cacheIds, int[] parts) {
+        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
+
         Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
 
         Set<ClusterNode> nodes = map.keySet();
@@ -457,54 +447,53 @@ public class GridReduceQueryExecutor {
         if (F.isEmpty(map))
             throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
 
-        if (!F.isEmpty(extraSpaces)) {
-            for (int i = 0; i < extraSpaces.size(); i++) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+        for (int i = 1; i < cacheIds.size(); i++) {
+            GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i));
 
-                String extraSpace = extraCctx.name();
+            String extraCacheName = extraCctx.name();
 
-                if (extraCctx.isLocal())
-                    continue; // No consistency guaranties for local caches.
+            if (extraCctx.isLocal())
+                continue; // No consistency guaranties for local caches.
 
-                if (isReplicatedOnly && !extraCctx.isReplicated())
-                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
-                        "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
+            if (isReplicatedOnly && !extraCctx.isReplicated())
+                throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+                    "with partitioned tables [replicatedCache=" + cctx.name() +
+                    ", partitionedCache=" + extraCacheName + "]");
 
-                Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
+            Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
 
-                if (F.isEmpty(extraNodes))
-                    throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
+            if (F.isEmpty(extraNodes))
+                throw new CacheException("Failed to find data nodes for cache: " + extraCacheName);
 
-                if (isReplicatedOnly && extraCctx.isReplicated()) {
-                    nodes.retainAll(extraNodes);
+            if (isReplicatedOnly && extraCctx.isReplicated()) {
+                nodes.retainAll(extraNodes);
 
-                    if (map.isEmpty()) {
-                        if (isPreloadingActive(cctx, extraSpaces))
-                            return null; // Retry.
-                        else
-                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                                ", cache2=" + extraSpace + "]");
-                    }
-                }
-                else if (!isReplicatedOnly && extraCctx.isReplicated()) {
-                    if (!extraNodes.containsAll(nodes))
-                        if (isPreloadingActive(cctx, extraSpaces))
-                            return null; // Retry.
-                        else
-                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                                ", cache2=" + extraSpace + "]");
-                }
-                else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
-                    if (!extraNodes.equals(nodes))
-                        if (isPreloadingActive(cctx, extraSpaces))
-                            return null; // Retry.
-                        else
-                            throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
-                                ", cache2=" + extraSpace + "]");
+                if (map.isEmpty()) {
+                    if (isPreloadingActive(cacheIds))
+                        return null; // Retry.
+                    else
+                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+                            ", cache2=" + extraCacheName + "]");
                 }
-                else
-                    throw new IllegalStateException();
             }
+            else if (!isReplicatedOnly && extraCctx.isReplicated()) {
+                if (!extraNodes.containsAll(nodes))
+                    if (isPreloadingActive(cacheIds))
+                        return null; // Retry.
+                    else
+                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+                            ", cache2=" + extraCacheName + "]");
+            }
+            else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
+                if (!extraNodes.equals(nodes))
+                    if (isPreloadingActive(cacheIds))
+                        return null; // Retry.
+                    else
+                        throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+                            ", cache2=" + extraCacheName + "]");
+            }
+            else
+                throw new IllegalStateException();
         }
 
         return map;
@@ -537,8 +526,6 @@ public class GridReduceQueryExecutor {
         final boolean isReplicatedOnly = qry.isReplicatedOnly();
 
         // Fail if all caches are replicated and explicit partitions are set.
-
-
         for (int attempt = 0;; attempt++) {
             if (attempt != 0) {
                 try {
@@ -561,7 +548,7 @@ public class GridReduceQueryExecutor {
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-            List<Integer> extraSpaces = qry.extraCaches();
+            List<Integer> cacheIds = qry.cacheIds();
 
             Collection<ClusterNode> nodes = null;
 
@@ -572,29 +559,29 @@ public class GridReduceQueryExecutor {
             Map<ClusterNode, IntArray> qryMap = null;
 
             // Partitions are not supported for queries over all replicated caches.
-            if (cctx.isReplicated() && parts != null) {
-                boolean failIfReplicatedOnly = true;
+            if (parts != null) {
+                boolean replicatedOnly = true;
 
-                for (Integer cacheId : extraSpaces) {
+                for (Integer cacheId : cacheIds) {
                     if (!cacheContext(cacheId).isReplicated()) {
-                        failIfReplicatedOnly = false;
+                        replicatedOnly = false;
 
                         break;
                     }
                 }
 
-                if (failIfReplicatedOnly)
+                if (replicatedOnly)
                     throw new CacheException("Partitions are not supported for replicated caches");
             }
 
             if (qry.isLocal())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
-                if (isPreloadingActive(cctx, extraSpaces)) {
+                if (isPreloadingActive(cacheIds)) {
                     if (isReplicatedOnly)
-                        nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+                        nodes = replicatedUnstableDataNodes(cacheIds);
                     else {
-                        partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+                        partsMap = partitionedUnstableDataNodes(cacheIds);
 
                         if (partsMap != null) {
                             qryMap = narrowForQuery(partsMap, parts);
@@ -602,8 +589,9 @@ public class GridReduceQueryExecutor {
                             nodes = qryMap == null ? null : qryMap.keySet();
                         }
                     }
-                } else {
-                    qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts);
+                }
+                else {
+                    qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
 
                     if (qryMap != null)
                         nodes = qryMap.keySet();
@@ -633,7 +621,7 @@ public class GridReduceQueryExecutor {
             final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
 
             final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
-                findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism();
+                findFirstPartitioned(cacheIds).config().getQueryParallelism();
 
             int replicatedQrysCnt = 0;
 
@@ -731,7 +719,7 @@ public class GridReduceQueryExecutor {
                                 .requestId(qryReqId)
                                 .topologyVersion(topVer)
                                 .pageSize(r.pageSize)
-                                .caches(qry.caches())
+                                .caches(qry.cacheIds())
                                 .tables(distributedJoins ? qry.tables() : null)
                                 .partitions(convert(partsMap))
                                 .queries(mapQrys)
@@ -873,22 +861,18 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param cctx Cache context for main space.
-     * @param extraSpaces Extra spaces.
+     * @param cacheIds Cache IDs.
      * @return The first partitioned cache context.
      */
-    private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) {
-        if (cctx.isLocal())
-            throw new CacheException("Cache is LOCAL: " + cctx.name());
-
-        if (!cctx.isReplicated())
-            return cctx;
+    private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
+        for (int i = 0; i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
 
-        for (int i = 0 ; i < extraSpaces.size(); i++) {
-            GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+            if (i == 0 && cctx.isLocal())
+                throw new CacheException("Cache is LOCAL: " + cctx.name());
 
-            if (!extraCctx.isReplicated() && !extraCctx.isLocal())
-                return extraCctx;
+            if (!cctx.isReplicated() && !cctx.isLocal())
+                return cctx;
         }
 
         throw new IllegalStateException("Failed to find partitioned cache.");
@@ -997,20 +981,20 @@ public class GridReduceQueryExecutor {
     /**
      * Calculates data nodes for replicated caches on unstable topology.
      *
-     * @param cctx Cache context for main space.
-     * @param extraSpaces Extra spaces.
+     * @param cacheIds Cache IDs.
      * @return Collection of all data nodes owning all the caches or {@code null} for retry.
      */
-    private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx,
-        List<Integer> extraSpaces) {
+    private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) {
         int i = 0;
 
+        GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
+
         // The main cache is allowed to be partitioned.
         if (!cctx.isReplicated()) {
-            assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache";
+            assert cacheIds.size() > 1: "no extra replicated caches with partitioned main cache";
 
             // Just replace the main cache with the first one extra.
-            cctx = cacheContext(extraSpaces.get(i++));
+            cctx = cacheContext(cacheIds.get(i++));
 
             assert cctx.isReplicated(): "all the extra caches must be replicated here";
         }
@@ -1020,27 +1004,26 @@ public class GridReduceQueryExecutor {
         if (F.isEmpty(nodes))
             return null; // Retry.
 
-        if (!F.isEmpty(extraSpaces)) {
-            for (;i < extraSpaces.size(); i++) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+        for (;i < cacheIds.size(); i++) {
+            GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i));
 
-                if (extraCctx.isLocal())
-                    continue;
+            if (extraCctx.isLocal())
+                continue;
 
-                if (!extraCctx.isReplicated())
-                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
-                        "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]");
+            if (!extraCctx.isReplicated())
+                throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+                    "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " +
+                    "partitionedCache=" + extraCctx.name() + "]");
 
-                Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
+            Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
 
-                if (F.isEmpty(extraOwners))
-                    return null; // Retry.
+            if (F.isEmpty(extraOwners))
+                return null; // Retry.
 
-                nodes.retainAll(extraOwners);
+            nodes.retainAll(extraOwners);
 
-                if (nodes.isEmpty())
-                    return null; // Retry.
-            }
+            if (nodes.isEmpty())
+                return null; // Retry.
         }
 
         return nodes;
@@ -1092,23 +1075,19 @@ public class GridReduceQueryExecutor {
     /**
      * Calculates partition mapping for partitioned cache on unstable topology.
      *
-     * @param cctx Cache context for main space.
-     * @param extraSpaces Extra spaces.
+     * @param cacheIds Cache IDs.
      * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
      */
     @SuppressWarnings("unchecked")
-    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx,
-        List<Integer> extraSpaces) {
-        assert !cctx.isLocal() : cctx.name() + " must not be LOCAL";
-
+    private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) {
         // If the main cache is replicated, just replace it with the first partitioned.
-        cctx = findFirstPartitioned(cctx, extraSpaces);
+        GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
 
         final int partsCnt = cctx.affinity().partitions();
 
-        if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
-            for (int i = 0; i < extraSpaces.size(); i++) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+        if (cacheIds.size() > 1) { // Check correct number of partitions for partitioned caches.
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
 
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
@@ -1117,14 +1096,15 @@ public class GridReduceQueryExecutor {
 
                 if (parts != partsCnt)
                     throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" +
-                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]");
+                        cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() +
+                        ", parts2=" + parts + "]");
             }
         }
 
         Set<ClusterNode>[] partLocs = new Set[partsCnt];
 
         // Fill partition locations for main cache.
-        for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
+        for (int p = 0; p < partsCnt; p++) {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
             if (F.isEmpty(owners)) {
@@ -1143,11 +1123,11 @@ public class GridReduceQueryExecutor {
             partLocs[p] = new HashSet<>(owners);
         }
 
-        if (extraSpaces != null) {
+        if (cacheIds.size() > 1) {
             // Find owner intersections for each participating partitioned cache partition.
             // We need this for logical collocation between different partitioned caches with the same affinity.
-            for (int i = 0; i < extraSpaces.size(); i++) {
-                GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
 
                 // This is possible if we have replaced a replicated cache with a partitioned one earlier.
                 if (cctx == extraCctx)
@@ -1156,7 +1136,7 @@ public class GridReduceQueryExecutor {
                 if (extraCctx.isReplicated() || extraCctx.isLocal())
                     continue;
 
-                for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
+                for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
                     List<ClusterNode> owners = extraCctx.topology().owners(p);
 
                     if (partLocs[p] == UNMAPPED_PARTS)
@@ -1166,7 +1146,8 @@ public class GridReduceQueryExecutor {
                         if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
                             return null; // Retry.
 
-                        throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
+                        throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() +
+                            ", part=" + p + "]");
                     }
 
                     if (partLocs[p] == null)
@@ -1181,8 +1162,8 @@ public class GridReduceQueryExecutor {
             }
 
             // Filter nodes where not all the replicated caches loaded.
-            for (int i = 0; i < extraSpaces.size(); i++) {
-                GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+            for (Integer cacheId : cacheIds) {
+                GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
 
                 if (!extraCctx.isReplicated())
                     continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 17bb9f6..beb1ae2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -110,8 +111,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
     /** */
     @GridToStringInclude
-    @GridDirectCollection(String.class)
-    private Collection<String> tbls;
+    @GridDirectCollection(Message.class)
+    private Collection<QueryTable> tbls;
 
     /** */
     private int timeout;
@@ -173,7 +174,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
      * @param tbls Tables.
      * @return {@code this}.
      */
-    public GridH2QueryRequest tables(Collection<String> tbls) {
+    public GridH2QueryRequest tables(Collection<QueryTable> tbls) {
         this.tbls = tbls;
 
         return this;
@@ -182,7 +183,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     /**
      * @return Tables.
      */
-    public Collection<String> tables() {
+    public Collection<QueryTable> tables() {
         return tbls;
     }
 
@@ -434,7 +435,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
+                if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
@@ -527,7 +528,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
                 reader.incrementState();
 
             case 7:
-                tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
+                tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 3a825f7..18b1afb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -108,6 +109,9 @@ public class GridH2ValueMessageFactory implements MessageFactory {
 
             case -35:
                 return new GridH2RowRangeBounds();
+
+            case -54:
+                return new QueryTable();
         }
 
         return null;


[3/8] ignite git commit: ignite-4763 doSetRollbackOnly method to be implemented in the SpringTransactionManager

Posted by sb...@apache.org.
ignite-4763 doSetRollbackOnly method to be implemented in the SpringTransactionManager


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4bb996d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4bb996d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4bb996d

Branch: refs/heads/ignite-5075-pds
Commit: c4bb996d86f80f1009d6efb85f5c659048bb0c48
Parents: 561d2cf
Author: NSAmelchev <ns...@gmail.com>
Authored: Wed Apr 26 16:17:19 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Mon May 22 16:10:40 2017 +0300

----------------------------------------------------------------------
 .../spring/SpringTransactionManager.java        | 13 ++++++
 .../GridSpringTransactionManagerSelfTest.java   | 45 ++++++++++++++++++++
 2 files changed, 58 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c4bb996d/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
index 2fe8aad..d09656e 100644
--- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
@@ -446,6 +446,19 @@ public class SpringTransactionManager extends AbstractPlatformTransactionManager
     }
 
     /** {@inheritDoc} */
+    @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
+        IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
+        Transaction tx = txObj.getTransactionHolder().getTransaction();
+
+        assert tx != null;
+
+        if (status.isDebug() && log.isDebugEnabled())
+            log.debug("Setting Ignite transaction rollback-only: " + tx);
+
+        tx.setRollbackOnly();
+    }
+
+    /** {@inheritDoc} */
     @Override protected void doCleanupAfterCompletion(Object transaction) {
         IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c4bb996d/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
index 8640c0b..e68ddb4 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
@@ -25,10 +25,14 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
 import org.springframework.context.ApplicationContext;
 import org.springframework.context.support.GenericXmlApplicationContext;
 import org.springframework.transaction.IllegalTransactionStateException;
 import org.springframework.transaction.InvalidIsolationLevelException;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
 
 /**
  * Spring transaction test.
@@ -144,4 +148,45 @@ public class GridSpringTransactionManagerSelfTest extends GridCommonAbstractTest
 
         assertEquals(0, c.size());
     }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testDoSetRollbackOnlyInExistingTransaction() throws Exception {
+        final IgniteCache<Integer, String> c = grid().cache(CACHE_NAME);
+
+        SpringTransactionManager mngr = new SpringTransactionManager();
+        mngr.setIgniteInstanceName(grid().name());
+        mngr.afterPropertiesSet();
+
+        TransactionTemplate txTmpl = new TransactionTemplate(mngr);
+
+        try {
+            txTmpl.execute(new TransactionCallback<Object>() {
+                @Override public Object doInTransaction(TransactionStatus status) {
+                    c.put(1, "1");
+
+                    Transaction tx = grid().transactions().tx();
+
+                    assertFalse(tx.isRollbackOnly());
+
+                    try {
+                        service.putWithError(c, 1_000);
+                    }
+                    catch (Exception ignored) {
+                        // No-op.
+                    }
+
+                    assertTrue(tx.isRollbackOnly());
+
+                    return null;
+                }
+            });
+        }
+        catch (Exception ignored) {
+            // No-op.
+        }
+
+        assertEquals(0, c.size());
+    }
 }


[5/8] ignite git commit: ignite-5075

Posted by sb...@apache.org.
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e8c8311
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e8c8311
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e8c8311

Branch: refs/heads/ignite-5075-pds
Commit: 6e8c83115b8d1fdc2d0c59363369015477d63aaa
Parents: 0e49e91
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 13:37:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 13:37:22 2017 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/configuration/CacheConfiguration.java | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c8311/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 5f44811..ff51a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -461,8 +461,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return grpName;
     }
 
-    public void setGroupName(String groupName) {
+    public CacheConfiguration<K, V> setGroupName(String groupName) {
         this.grpName = groupName;
+
+        return this;
     }
 
     /**


[8/8] ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-5075' into ignite-5075-pds

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075' into ignite-5075-pds


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b86104be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b86104be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b86104be

Branch: refs/heads/ignite-5075-pds
Commit: b86104be7f0c26bfda394d448b3276e2d5eef803
Parents: aaf9f45 7fc0168
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 15:08:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 15:08:34 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    |  15 +-
 .../ignite/tests/IgnitePersistentStoreTest.java |  62 ++++-
 .../persistence/loadall_blob/ignite-config.xml  |  90 +++++++
 .../loadall_blob/persistence-settings.xml       |  29 +++
 .../store/jdbc/CacheAbstractJdbcStore.java      |   6 +-
 .../configuration/CacheConfiguration.java       |   4 +-
 .../communication/GridIoMessageFactory.java     |   2 +
 .../cache/CacheGroupInfrastructure.java         | 172 +++++++-------
 .../platform/cache/PlatformCache.java           |  28 ++-
 .../cache/query/GridCacheTwoStepQuery.java      |  84 ++-----
 .../processors/cache/query/QueryTable.java      | 164 +++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  84 +++----
 .../query/h2/opt/GridH2IndexBase.java           |   2 +-
 .../processors/query/h2/opt/GridH2Table.java    |  28 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  21 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  15 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 233 +++++++++----------
 .../h2/twostep/msg/GridH2QueryRequest.java      |  13 +-
 .../twostep/msg/GridH2ValueMessageFactory.java  |   4 +
 .../core/include/ignite/cache/query/query_sql.h |   2 +
 .../ignite/cache/query/query_sql_fields.h       |   3 +
 .../Cache/Query/CacheLinqTest.cs                |  36 ++-
 .../Cache/Query/CacheQueriesTest.cs             |  60 ++++-
 .../Cache/Query/SqlFieldsQuery.cs               |  29 ++-
 .../Apache.Ignite.Core/Cache/Query/SqlQuery.cs  |  31 +++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |   3 +
 .../Apache.Ignite.Linq/CacheExtensions.cs       |  10 +
 .../Impl/CacheFieldsQueryExecutor.cs            |  42 ++--
 .../Apache.Ignite.Linq/Impl/CacheQueryable.cs   |   3 +-
 .../dotnet/Apache.Ignite.Linq/QueryOptions.cs   |  23 ++
 .../spring/SpringTransactionManager.java        |  13 ++
 .../GridSpringTransactionManagerSelfTest.java   |  45 ++++
 32 files changed, 958 insertions(+), 398 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index e717670,7c07d2d..34feeaf
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@@ -20,8 -20,8 +20,9 @@@ package org.apache.ignite.internal.proc
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;
+ import java.util.Iterator;
  import java.util.List;
 +import java.util.Set;
  import java.util.UUID;
  import org.apache.ignite.IgniteCheckedException;
  import org.apache.ignite.IgniteLogger;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------


[6/8] ignite git commit: ignite-5075

Posted by sb...@apache.org.
ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19ed098e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19ed098e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19ed098e

Branch: refs/heads/ignite-5075-pds
Commit: 19ed098ee02667c2b6e359f6f0c39cead8b01569
Parents: 6e8c831
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 14:44:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 14:44:42 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheGroupInfrastructure.java         | 172 ++++++++++---------
 1 file changed, 90 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19ed098e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 6d61ab1..7c07d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
@@ -97,7 +98,7 @@ public class CacheGroupInfrastructure {
     private boolean needsRecovery;
 
     /** */
-    private final List<GridCacheContext> caches;
+    private volatile List<GridCacheContext> caches;
 
     /** */
     private final IgniteLogger log;
@@ -235,14 +236,14 @@ public class CacheGroupInfrastructure {
      * @return {@code True} if group contains cache with given name.
      */
     public boolean hasCache(String cacheName) {
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                if (caches.get(i).name().equals(cacheName))
-                    return true;
-            }
+        List<GridCacheContext> caches = this.caches;
 
-            return false;
+        for (int i = 0; i < caches.size(); i++) {
+            if (caches.get(i).name().equals(cacheName))
+                return true;
         }
+
+        return false;
     }
 
     /**
@@ -252,51 +253,58 @@ public class CacheGroupInfrastructure {
         assert cacheType.userCache() == cctx.userCache() : cctx.name();
         assert grpId == cctx.groupId() : cctx.name();
 
-        synchronized (caches) {
-            assert sharedGroup() || caches.isEmpty();
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
 
-            boolean add = caches.add(cctx);
+        assert sharedGroup() || caches.isEmpty();
 
-            assert add : cctx.name();
-        }
+        boolean add = caches.add(cctx);
+
+        assert add : cctx.name();
+
+        this.caches = caches;
     }
 
     /**
      * @param cctx Cache context.
      */
     private void removeCacheContext(GridCacheContext cctx) {
-        synchronized (caches) {
-            if (caches.contains(cctx)) { // It is possible cache is not added in case of errors on cache start.
-                assert sharedGroup() || caches.size() == 1 : caches.size();
+        ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+        // It is possible cache was not added in case of errors on cache start.
+        for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) {
+            GridCacheContext next = it.next();
 
-                boolean rmv = caches.remove(cctx);
+            if (next == cctx) {
+                assert sharedGroup() || caches.size() == 1 : caches.size();
 
-                assert rmv : cctx.name();
+                it.remove();
             }
         }
+
+        this.caches = caches;
     }
 
     /**
      * @return Cache context if group contains single cache.
      */
     public GridCacheContext singleCacheContext() {
-        synchronized (caches) {
-            assert !sharedGroup() && caches.size() == 1;
+        List<GridCacheContext> caches = this.caches;
 
-            return caches.get(0);
-        }
+        assert !sharedGroup() && caches.size() == 1;
+
+        return caches.get(0);
     }
 
     /**
      *
      */
     public void unwindUndeploys() {
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                GridCacheContext cctx = caches.get(i);
+        List<GridCacheContext> caches = this.caches;
 
-                cctx.deploy().unwind(cctx);
-            }
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.deploy().unwind(cctx);
         }
     }
 
@@ -326,20 +334,20 @@ public class CacheGroupInfrastructure {
         if (!eventRecordable(type))
             LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
 
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                GridCacheContext cctx = caches.get(i);
-
-                if (cctx.recordEvent(type)) {
-                    cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
-                        cctx.localNode(),
-                        "Cache rebalancing event.",
-                        type,
-                        part,
-                        discoNode,
-                        discoType,
-                        discoTs));
-                }
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            if (cctx.recordEvent(type)) {
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache rebalancing event.",
+                    type,
+                    part,
+                    discoNode,
+                    discoType,
+                    discoTs));
             }
         }
     }
@@ -353,19 +361,19 @@ public class CacheGroupInfrastructure {
             LT.warn(log, "Added event without checking if event is recordable: " +
                 U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
 
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                GridCacheContext cctx = caches.get(i);
+        List<GridCacheContext> caches = this.caches;
 
-                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
-                    cctx.localNode(),
-                    "Cache unloading event.",
-                    EVT_CACHE_REBALANCE_PART_UNLOADED,
-                    part,
-                    null,
-                    0,
-                    0));
-            }
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                cctx.localNode(),
+                "Cache unloading event.",
+                EVT_CACHE_REBALANCE_PART_UNLOADED,
+                part,
+                null,
+                0,
+                0));
         }
     }
 
@@ -391,25 +399,25 @@ public class CacheGroupInfrastructure {
         boolean hasOldVal,
         boolean keepBinary
     ) {
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                GridCacheContext cctx = caches.get(i);
-
-                cctx.events().addEvent(part,
-                    key,
-                    evtNodeId,
-                    (IgniteUuid)null,
-                    null,
-                    type,
-                    newVal,
-                    hasNewVal,
-                    oldVal,
-                    hasOldVal,
-                    null,
-                    null,
-                    null,
-                    keepBinary);
-            }
+        List<GridCacheContext> caches = this.caches;
+
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
+
+            cctx.events().addEvent(part,
+                key,
+                evtNodeId,
+                (IgniteUuid)null,
+                null,
+                type,
+                newVal,
+                hasNewVal,
+                oldVal,
+                hasOldVal,
+                null,
+                null,
+                null,
+                keepBinary);
         }
     }
 
@@ -620,26 +628,26 @@ public class CacheGroupInfrastructure {
      * @return {@code True} if group contains caches.
      */
     boolean hasCaches() {
-        synchronized (caches) {
-            return !caches.isEmpty();
-        }
+        List<GridCacheContext> caches = this.caches;
+
+        return !caches.isEmpty();
     }
 
     /**
      * @param part Partition ID.
      */
     public void onPartitionEvicted(int part) {
-        synchronized (caches) {
-            for (int i = 0; i < caches.size(); i++) {
-                GridCacheContext cctx = caches.get(i);
+        List<GridCacheContext> caches = this.caches;
 
-                if (cctx.isDrEnabled())
-                    cctx.dr().partitionEvicted(part);
+        for (int i = 0; i < caches.size(); i++) {
+            GridCacheContext cctx = caches.get(i);
 
-                cctx.continuousQueries().onPartitionEvicted(part);
+            if (cctx.isDrEnabled())
+                cctx.dr().partitionEvicted(part);
 
-                cctx.dataStructures().onPartitionEvicted(part);
-            }
+            cctx.continuousQueries().onPartitionEvicted(part);
+
+            cctx.dataStructures().onPartitionEvicted(part);
         }
     }
 


[7/8] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-5075

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fc01683
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fc01683
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fc01683

Branch: refs/heads/ignite-5075-pds
Commit: 7fc01683a7e0995d41ad8d900767d6e48443d8b2
Parents: 19ed098 ca94cf3
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 15:05:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 15:05:52 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    |  15 +-
 .../ignite/tests/IgnitePersistentStoreTest.java |  62 ++++-
 .../persistence/loadall_blob/ignite-config.xml  |  90 +++++++
 .../loadall_blob/persistence-settings.xml       |  29 +++
 .../store/jdbc/CacheAbstractJdbcStore.java      |   6 +-
 .../managers/communication/GridIoManager.java   |  30 ++-
 .../communication/GridIoMessageFactory.java     |   2 +
 .../platform/cache/PlatformCache.java           |  28 ++-
 .../cache/query/GridCacheTwoStepQuery.java      |  84 ++-----
 .../processors/cache/query/QueryTable.java      | 164 +++++++++++++
 .../processors/query/h2/IgniteH2Indexing.java   |  84 +++----
 .../query/h2/opt/GridH2IndexBase.java           |   2 +-
 .../processors/query/h2/opt/GridH2Table.java    |  28 ++-
 .../query/h2/sql/GridSqlQuerySplitter.java      |  21 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  15 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 233 +++++++++----------
 .../h2/twostep/msg/GridH2QueryRequest.java      |  13 +-
 .../twostep/msg/GridH2ValueMessageFactory.java  |   4 +
 .../core/include/ignite/cache/query/query_sql.h |   2 +
 .../ignite/cache/query/query_sql_fields.h       |   3 +
 .../Cache/Query/CacheLinqTest.cs                |  36 ++-
 .../Cache/Query/CacheQueriesTest.cs             |  60 ++++-
 .../Cache/Query/SqlFieldsQuery.cs               |  29 ++-
 .../Apache.Ignite.Core/Cache/Query/SqlQuery.cs  |  31 +++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |   3 +
 .../Apache.Ignite.Linq/CacheExtensions.cs       |  10 +
 .../Impl/CacheFieldsQueryExecutor.cs            |  42 ++--
 .../Apache.Ignite.Linq/Impl/CacheQueryable.cs   |   3 +-
 .../dotnet/Apache.Ignite.Linq/QueryOptions.cs   |  23 ++
 .../spring/SpringTransactionManager.java        |  13 ++
 .../GridSpringTransactionManagerSelfTest.java   |  45 ++++
 31 files changed, 892 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7fc01683/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/7fc01683/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8286b45,75914ef..bba5fca
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -1163,10 -1143,11 +1143,11 @@@ public class GridReduceQueryExecutor 
                          continue; // Skip unmapped partitions.
  
                      if (F.isEmpty(owners)) {
 -                        if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
 +                        if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE)))
                              return null; // Retry.
  
-                         throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
+                         throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() +
+                             ", part=" + p + "]");
                      }
  
                      if (partLocs[p] == null)


[2/8] ignite git commit: Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"

Posted by sb...@apache.org.
Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"

Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/561d2cf0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/561d2cf0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/561d2cf0

Branch: refs/heads/ignite-5075-pds
Commit: 561d2cf048be59524acfbe2ac064d8b633b99c37
Parents: f74d51c
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon May 22 14:30:30 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon May 22 14:30:30 2017 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 15 +++-
 .../ignite/tests/IgnitePersistentStoreTest.java | 62 +++++++++++++-
 .../persistence/loadall_blob/ignite-config.xml  | 90 ++++++++++++++++++++
 .../loadall_blob/persistence-settings.xml       | 29 +++++++
 .../store/jdbc/CacheAbstractJdbcStore.java      |  6 +-
 5 files changed, 198 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 98c8b40..b438946 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -23,16 +23,17 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import javax.cache.Cache;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.CacheStore;
@@ -52,7 +53,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.logger.NullLogger;
 import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
 
 /**
  * Implementation of {@link CacheStore} backed by Cassandra database.
@@ -64,6 +67,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
     /** Buffer to store mutations performed withing transaction. */
     private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
 
+    /** Thread name. */
+    private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader";
+
+    /** Auto-injected ignite instance. */
+    @SuppressWarnings("unused")
+    @IgniteInstanceResource
+    private Ignite ignite;
+
     /** Auto-injected store session. */
     @SuppressWarnings("unused")
     @CacheStoreSessionResource
@@ -109,7 +120,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         Collection<Future<?>> futs = new ArrayList<>(args.length);
 
         try {
-            pool = Executors.newFixedThreadPool(maxPoolSize);
+            pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
 
             CassandraSession ses = getCassandraSession();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index c8c7139..feccb24 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -25,9 +25,11 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.tests.pojos.Person;
@@ -42,9 +44,9 @@ import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Assert;
 import org.springframework.core.io.ClassPathResource;
 
 /**
@@ -247,6 +249,34 @@ public class IgnitePersistentStoreTest {
 
     /** */
     @Test
+    public void blobBinaryLoadCacheTest() {
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+            IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+            assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
+
+            personCache.put(1L, new PojoPerson(1, "name"));
+
+            assert personCache.withKeepBinary().get(1L) instanceof BinaryObject;
+        }
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+            IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+            personCache.loadCache(null, null);
+
+            PojoPerson person = personCache.get(1L);
+
+            LOGGER.info("loadCache tests passed");
+        }
+    }
+
+    /** */
+    @Test
     public void pojoStrategyTest() {
         Ignition.stopAll(true);
 
@@ -673,4 +703,34 @@ public class IgnitePersistentStoreTest {
                 " concurrency and " + isolation + " isolation level");
         LOGGER.info("-----------------------------------------------------------------------------------");
     }
+
+    /** */
+    public static class PojoPerson {
+        /** */
+        private int id;
+
+        /** */
+        private String name;
+
+        /** */
+        public PojoPerson() {
+            // No-op.
+        }
+
+        /** */
+        public PojoPerson(int id, String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /** */
+        public int getId() {
+            return id;
+        }
+
+        /** */
+        public String getName() {
+            return name;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
new file mode 100644
index 0000000..115e263
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+    <!-- Cassandra connection settings -->
+    <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/>
+
+    <!-- Persistence settings for 'cache2' -->
+    <bean id="cache2_persistence_settings"
+          class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+        <constructor-arg type="org.springframework.core.io.Resource"
+                         value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/>
+    </bean>
+
+    <!-- Ignite configuration -->
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+
+        <property name="marshaller">
+            <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
+        </property>
+
+        <property name="binaryConfiguration">
+            <bean class="org.apache.ignite.configuration.BinaryConfiguration">
+                <property name="compactFooter" value="false"/>
+            </bean>
+        </property>
+
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Configuring persistence for "cache2" cache -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="cache2"/>
+                    <property name="readThrough" value="true"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="storeKeepBinary" value="true"/>
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+                            <property name="persistenceSettingsBean" value="cache2_persistence_settings"/>
+                        </bean>
+                    </property>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <!--
+                        Ignite provides several options for automatic discovery that can be used
+                        instead os static IP based discovery. For information on all options refer
+                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
+                    -->
+                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
+                        <property name="addresses">
+                            <list>
+                                <!-- In distributed environment, replace with actual host IP address. -->
+                                <value>127.0.0.1:47500..47509</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
new file mode 100644
index 0000000..e872201
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
@@ -0,0 +1,29 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<persistence keyspace="test1" table="blob_test3">
+    <!-- By default Java standard serialization is going to be used -->
+    <keyPersistence class="java.lang.Long"
+                    strategy="BLOB"
+                    column="key"/>
+
+    <!-- Kryo serialization specified to be used -->
+    <valuePersistence class="org.apache.ignite.tests.pojos.Person"
+                      strategy="BLOB"
+                      serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer"
+                      column="value"/>
+</persistence>

http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 46e9022..b1ec38d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.resources.CacheStoreSessionResource;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -121,6 +122,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
     /** Connection attribute property name. */
     protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
 
+    /** Thread name. */
+    private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader";
+
     /** Built in Java types names. */
     protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>();
 
@@ -680,7 +684,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
         String cacheName = session().cacheName();
 
         try {
-            pool = Executors.newFixedThreadPool(maxPoolSize);
+            pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
 
             Collection<Future<?>> futs = new ArrayList<>();
 


[4/8] ignite git commit: IGNITE-5257 .NET: SQL query timeouts

Posted by sb...@apache.org.
IGNITE-5257 .NET: SQL query timeouts

This closes #1985


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca94cf3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca94cf3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca94cf3d

Branch: refs/heads/ignite-5075-pds
Commit: ca94cf3d6c708218ef22aa40c07c436c75360bc6
Parents: c4bb996
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon May 22 17:45:10 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon May 22 17:45:10 2017 +0300

----------------------------------------------------------------------
 .../platform/cache/PlatformCache.java           | 28 +++++++--
 .../core/include/ignite/cache/query/query_sql.h |  2 +
 .../ignite/cache/query/query_sql_fields.h       |  3 +
 .../Cache/Query/CacheLinqTest.cs                | 36 ++++++++++--
 .../Cache/Query/CacheQueriesTest.cs             | 60 +++++++++++++++++---
 .../Cache/Query/SqlFieldsQuery.cs               | 29 +++++++++-
 .../Apache.Ignite.Core/Cache/Query/SqlQuery.cs  | 31 ++++++++++
 .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs  |  3 +
 .../Apache.Ignite.Linq/CacheExtensions.cs       | 10 ++++
 .../Impl/CacheFieldsQueryExecutor.cs            | 42 ++++++--------
 .../Apache.Ignite.Linq/Impl/CacheQueryable.cs   |  3 +-
 .../dotnet/Apache.Ignite.Linq/QueryOptions.cs   | 23 ++++++++
 12 files changed, 224 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index c61b75e..13a8ca1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -1282,8 +1282,16 @@ public class PlatformCache extends PlatformAbstractTarget {
         Object[] args = readQueryArgs(reader);
 
         boolean distrJoins = reader.readBoolean();
-
-        return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc).setDistributedJoins(distrJoins);
+        int timeout = reader.readInt();
+        boolean replicated = reader.readBoolean();
+
+        return new SqlQuery(typ, sql)
+                .setPageSize(pageSize)
+                .setArgs(args)
+                .setLocal(loc)
+                .setDistributedJoins(distrJoins)
+                .setTimeout(timeout, TimeUnit.MILLISECONDS)
+                .setReplicatedOnly(replicated);
     }
 
     /**
@@ -1301,9 +1309,19 @@ public class PlatformCache extends PlatformAbstractTarget {
 
         boolean distrJoins = reader.readBoolean();
         boolean enforceJoinOrder = reader.readBoolean();
-
-        return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc)
-            .setDistributedJoins(distrJoins).setEnforceJoinOrder(enforceJoinOrder);
+        int timeout = reader.readInt();
+        boolean replicated = reader.readBoolean();
+        boolean collocated = reader.readBoolean();
+
+        return new SqlFieldsQuery(sql)
+                .setPageSize(pageSize)
+                .setArgs(args)
+                .setLocal(loc)
+                .setDistributedJoins(distrJoins)
+                .setEnforceJoinOrder(enforceJoinOrder)
+                .setTimeout(timeout, TimeUnit.MILLISECONDS)
+                .setReplicatedOnly(replicated)
+                .setCollocated(collocated);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
index d733476..eb0606a 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
@@ -272,6 +272,8 @@ namespace ignite
                         (*it)->Write(writer);
 
                     writer.WriteBool(distributedJoins);
+                    writer.WriteInt32(0);  // Timeout, ms
+                    writer.WriteBool(false);  // ReplicatedOnly
                 }
 
             private:

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
index 954cf43..db26fc4 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
@@ -295,6 +295,9 @@ namespace ignite
 
                     writer.WriteBool(distributedJoins);
                     writer.WriteBool(enforceJoinOrder);
+                    writer.WriteInt32(0);  // Timeout, ms
+                    writer.WriteBool(false);  // ReplicatedOnly
+                    writer.WriteBool(false);  // Colocated
                 }
 
             private:

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
index 265a149..cb3fece 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
@@ -1273,7 +1273,10 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             {
                 Local = true,
                 PageSize = 999,
-                EnforceJoinOrder = true
+                EnforceJoinOrder = true,
+                Timeout = TimeSpan.FromSeconds(2.5),
+                ReplicatedOnly = true,
+                Colocated = true
             }).Where(x => x.Key > 10);
 
             Assert.AreEqual(cache.Name, query.CacheName);
@@ -1288,12 +1291,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             Assert.AreEqual(999, fq.PageSize);
             Assert.IsFalse(fq.EnableDistributedJoins);
             Assert.IsTrue(fq.EnforceJoinOrder);
+            Assert.IsTrue(fq.ReplicatedOnly);
+            Assert.IsTrue(fq.Colocated);
+            Assert.AreEqual(TimeSpan.FromSeconds(2.5), fq.Timeout);
 
             var str = query.ToString();
             Assert.AreEqual("CacheQueryable [CacheName=person_org, TableName=Person, Query=SqlFieldsQuery " +
                             "[Sql=select _T0._key, _T0._val from \"person_org\".Person as _T0 where " +
                             "(_T0._key > ?), Arguments=[10], " +
-                            "Local=True, PageSize=999, EnableDistributedJoins=False, EnforceJoinOrder=True]]", str);
+                            "Local=True, PageSize=999, EnableDistributedJoins=False, EnforceJoinOrder=True, " +
+                            "Timeout=00:00:02.5000000, ReplicatedOnly=True, Colocated=True]]", str);
 
             // Check fields query
             var fieldsQuery = (ICacheQueryable) cache.AsCacheQueryable().Select(x => x.Value.Name);
@@ -1311,7 +1318,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             str = fieldsQuery.ToString();
             Assert.AreEqual("CacheQueryable [CacheName=person_org, TableName=Person, Query=SqlFieldsQuery " +
                             "[Sql=select _T0.Name from \"person_org\".Person as _T0, Arguments=[], Local=False, " +
-                            "PageSize=1024, EnableDistributedJoins=False, EnforceJoinOrder=False]]", str);
+                            "PageSize=1024, EnableDistributedJoins=False, EnforceJoinOrder=False, " +
+                            "Timeout=00:00:00, ReplicatedOnly=False, Colocated=False]]", str);
             
             // Check distributed joins flag propagation
             var distrQuery = cache.AsCacheQueryable(new QueryOptions {EnableDistributedJoins = true})
@@ -1326,7 +1334,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
                             "[Sql=select _T0._key, _T0._val from \"person_org\".Person as _T0 where " +
                             "(((_T0._key > ?) and (_T0.age1 > ?)) " +
                             "and (_T0.Name like \'%\' || ? || \'%\') ), Arguments=[10, 20, x], Local=False, " +
-                            "PageSize=1024, EnableDistributedJoins=True, EnforceJoinOrder=False]]", str);
+                            "PageSize=1024, EnableDistributedJoins=True, EnforceJoinOrder=False, " +
+                            "Timeout=00:00:00, ReplicatedOnly=False, Colocated=False]]", str);
         }
 
         /// <summary>
@@ -1396,6 +1405,25 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         }
 
         /// <summary>
+        /// Tests the query timeout.
+        /// </summary>
+        [Test]
+        public void TestTimeout()
+        {
+            var persons = GetPersonCache().AsCacheQueryable(new QueryOptions
+            {
+                Timeout = TimeSpan.FromMilliseconds(1),
+                EnableDistributedJoins = true
+            });
+
+            // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+            var ex = Assert.Throws<CacheException>(() =>
+                persons.SelectMany(p => GetRoleCache().AsCacheQueryable()).ToArray());
+
+            Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+        }
+
+        /// <summary>
         /// Gets the person cache.
         /// </summary>
         /// <returns></returns>

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 01277e1..60d2fdf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -105,8 +105,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
 
             for (int i = 0; i < GridCnt; i++)
             {
-                for (int j = 0; j < MaxItemCnt; j++)
-                    cache.Remove(j);
+                cache.Clear();
 
                 Assert.IsTrue(cache.IsEmpty());
             }
@@ -352,9 +351,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             // 2. Validate results.
             var qry = new SqlQuery(typeof(QueryPerson), "age < 50", loc)
             {
-                EnableDistributedJoins = distrJoin
+                EnableDistributedJoins = distrJoin,
+                ReplicatedOnly = false,
+                Timeout = TimeSpan.FromSeconds(3)
             };
 
+            Assert.AreEqual(string.Format("SqlQuery [Sql=age < 50, Arguments=[], Local={0}, " +
+                                          "PageSize=1024, EnableDistributedJoins={1}, Timeout={2}, " +
+                                          "ReplicatedOnly=False]", loc, distrJoin, qry.Timeout), qry.ToString());
+
             ValidateQueryResults(cache, qry, exp, keepBinary);
         }
 
@@ -376,7 +381,10 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
             var qry = new SqlFieldsQuery("SELECT name, age FROM QueryPerson WHERE age < 50", loc)
             {
                 EnableDistributedJoins = distrJoin,
-                EnforceJoinOrder = enforceJoinOrder
+                EnforceJoinOrder = enforceJoinOrder,
+                Colocated = !distrJoin,
+                ReplicatedOnly = false,
+                Timeout = TimeSpan.FromSeconds(2)
             };
 
             using (IQueryCursor<IList> cursor = cache.QueryFields(qry))
@@ -673,6 +681,44 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         }
 
         /// <summary>
+        /// Tests query timeouts.
+        /// </summary>
+        [Test]
+        public void TestSqlQueryTimeout()
+        {
+            var cache = Cache();
+            PopulateCache(cache, false, 20000, x => true);
+
+            var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 500 AND name like '%1%'")
+            {
+                Timeout = TimeSpan.FromMilliseconds(2)
+            };
+
+            // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+            var ex = Assert.Throws<CacheException>(() => cache.Query(sqlQry).ToArray());
+            Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+        }
+
+        /// <summary>
+        /// Tests fields query timeouts.
+        /// </summary>
+        [Test]
+        public void TestSqlFieldsQueryTimeout()
+        {
+            var cache = Cache();
+            PopulateCache(cache, false, 20000, x => true);
+
+            var fieldsQry = new SqlFieldsQuery("SELECT * FROM QueryPerson WHERE age < 5000 AND name like '%0%'")
+            {
+                Timeout = TimeSpan.FromMilliseconds(3)
+            };
+
+            // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+            var ex = Assert.Throws<CacheException>(() => cache.QueryFields(fieldsQry).ToArray());
+            Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+        }
+
+        /// <summary>
         /// Validates the query results.
         /// </summary>
         /// <param name="cache">Cache.</param>
@@ -820,7 +866,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
 
             for (var i = 0; i < cnt; i++)
             {
-                var val = rand.Next(100);
+                var val = rand.Next(cnt);
 
                 cache.Put(val, new QueryPerson(val.ToString(), val));
 
@@ -845,8 +891,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
         public QueryPerson(string name, int age)
         {
             Name = name;
-            Age = age;
-            Birthday = DateTime.UtcNow.AddYears(-age);
+            Age = age % 2000;
+            Birthday = DateTime.UtcNow.AddYears(-Age);
         }
 
         /// <summary>

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
index aab2bfe..4809574 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Core.Cache.Query
 {
+    using System;
     using System.Diagnostics.CodeAnalysis;
     using System.Linq;
 
@@ -105,6 +106,28 @@ namespace Apache.Ignite.Core.Cache.Query
         public bool EnforceJoinOrder { get; set; }
 
         /// <summary>
+        /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+        /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+        /// </summary>
+        public TimeSpan Timeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether this query contains only replicated tables.
+        /// This is a hint for potentially more effective execution.
+        /// </summary>
+        public bool ReplicatedOnly { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether this query operates on colocated data.
+        /// <para />
+        /// Whenever Ignite executes a distributed query, it sends sub-queries to individual cluster members.
+        /// If you know in advance that the elements of your query selection are colocated together on the same
+        /// node and you group by colocated key (primary or affinity key), then Ignite can make significant
+        /// performance and network optimizations by grouping data on remote nodes.
+        /// </summary>
+        public bool Colocated { get; set; }
+
+        /// <summary>
         /// Returns a <see cref="string" /> that represents this instance.
         /// </summary>
         /// <returns>
@@ -115,8 +138,10 @@ namespace Apache.Ignite.Core.Cache.Query
             var args = string.Join(", ", Arguments.Select(x => x == null ? "null" : x.ToString()));
 
             return string.Format("SqlFieldsQuery [Sql={0}, Arguments=[{1}], Local={2}, PageSize={3}, " +
-                                 "EnableDistributedJoins={4}, EnforceJoinOrder={5}]", Sql, args, Local,
-                                 PageSize, EnableDistributedJoins, EnforceJoinOrder);
+                                 "EnableDistributedJoins={4}, EnforceJoinOrder={5}, Timeout={6}, ReplicatedOnly={7}" +
+                                 ", Colocated={8}]", Sql, args, Local,
+                                 PageSize, EnableDistributedJoins, EnforceJoinOrder, Timeout, ReplicatedOnly,
+                                 Colocated);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
index 70e08b2..7d8e8fb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Cache.Query
 {
     using System;
     using System.Diagnostics.CodeAnalysis;
+    using System.Linq;
     using Apache.Ignite.Core.Impl.Binary;
     using Apache.Ignite.Core.Impl.Cache;
     using Apache.Ignite.Core.Impl.Common;
@@ -108,6 +109,18 @@ namespace Apache.Ignite.Core.Cache.Query
         /// </value>
         public bool EnableDistributedJoins { get; set; }
 
+        /// <summary>
+        /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+        /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+        /// </summary>
+        public TimeSpan Timeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether this query contains only replicated tables.
+        /// This is a hint for potentially more effective execution.
+        /// </summary>
+        public bool ReplicatedOnly { get; set; }
+
         /** <inheritDoc /> */
         internal override void Write(BinaryWriter writer, bool keepBinary)
         {
@@ -126,6 +139,8 @@ namespace Apache.Ignite.Core.Cache.Query
             WriteQueryArgs(writer, Arguments);
 
             writer.WriteBoolean(EnableDistributedJoins);
+            writer.WriteInt((int) Timeout.TotalMilliseconds);
+            writer.WriteBoolean(ReplicatedOnly);
         }
 
         /** <inheritDoc /> */
@@ -133,5 +148,21 @@ namespace Apache.Ignite.Core.Cache.Query
         {
             get { return CacheOp.QrySql; }
         }
+
+        /// <summary>
+        /// Returns a <see cref="string" /> that represents this instance.
+        /// </summary>
+        /// <returns>
+        /// A <see cref="string" /> that represents this instance.
+        /// </returns>
+        public override string ToString()
+        {
+            var args = string.Join(", ", Arguments.Select(x => x == null ? "null" : x.ToString()));
+
+            return string.Format("SqlQuery [Sql={0}, Arguments=[{1}], Local={2}, PageSize={3}, " +
+                                 "EnableDistributedJoins={4}, Timeout={5}, ReplicatedOnly={6}]", Sql, args, Local,
+                PageSize, EnableDistributedJoins, Timeout, ReplicatedOnly);
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 749409c..95787eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -1086,6 +1086,9 @@ namespace Apache.Ignite.Core.Impl.Cache
 
                 writer.WriteBoolean(qry.EnableDistributedJoins);
                 writer.WriteBoolean(qry.EnforceJoinOrder);
+                writer.WriteInt((int) qry.Timeout.TotalMilliseconds);
+                writer.WriteBoolean(qry.ReplicatedOnly);
+                writer.WriteBoolean(qry.Colocated);
             });
         
             return new FieldsQueryCursor<T>(cursor, Marshaller, _flagKeepBinary, readerFunc);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
index 4b536f4..2c609c6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Linq
     using System.Linq;
     using Apache.Ignite.Core.Cache;
     using Apache.Ignite.Core.Cache.Configuration;
+    using Apache.Ignite.Core.Impl.Common;
     using Apache.Ignite.Linq.Impl;
 
     /// <summary>
@@ -43,6 +44,8 @@ namespace Apache.Ignite.Linq
         public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
             this ICache<TKey, TValue> cache)
         {
+            IgniteArgumentCheck.NotNull(cache, "cache");
+
             return cache.AsCacheQueryable(false, null);
         }
 
@@ -64,6 +67,8 @@ namespace Apache.Ignite.Linq
         public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
             this ICache<TKey, TValue> cache, bool local)
         {
+            IgniteArgumentCheck.NotNull(cache, "cache");
+
             return cache.AsCacheQueryable(local, null);
         }
 
@@ -92,6 +97,8 @@ namespace Apache.Ignite.Linq
         public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
             this ICache<TKey, TValue> cache, bool local, string tableName)
         {
+            IgniteArgumentCheck.NotNull(cache, "cache");
+
             return cache.AsCacheQueryable(new QueryOptions {Local = local, TableName = tableName});
         }
 
@@ -114,6 +121,9 @@ namespace Apache.Ignite.Linq
         public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
             this ICache<TKey, TValue> cache, QueryOptions queryOptions)
         {
+            IgniteArgumentCheck.NotNull(cache, "cache");
+            IgniteArgumentCheck.NotNull(queryOptions, "queryOptions");
+
             return new CacheQueryable<TKey, TValue>(cache, queryOptions);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
index 8dfddc7..27082bd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
@@ -38,41 +38,26 @@ namespace Apache.Ignite.Linq.Impl
     {
         /** */
         private readonly ICacheInternal _cache;
+        
+        /** */
+        private readonly QueryOptions _options;
 
         /** */
         private static readonly CopyOnWriteConcurrentDictionary<ConstructorInfo, object> CtorCache =
             new CopyOnWriteConcurrentDictionary<ConstructorInfo, object>();
 
-        /** */
-        private readonly bool _local;
-
-        /** */
-        private readonly int _pageSize;
-
-        /** */
-        private readonly bool _enableDistributedJoins;
-
-        /** */
-        private readonly bool _enforceJoinOrder;
-
         /// <summary>
         /// Initializes a new instance of the <see cref="CacheFieldsQueryExecutor" /> class.
         /// </summary>
         /// <param name="cache">The executor function.</param>
-        /// <param name="local">Local flag.</param>
-        /// <param name="pageSize">Size of the page.</param>
-        /// <param name="enableDistributedJoins">Distributed joins flag.</param>
-        /// <param name="enforceJoinOrder">Enforce join order flag.</param>
-        public CacheFieldsQueryExecutor(ICacheInternal cache, bool local, int pageSize, bool enableDistributedJoins,
-            bool enforceJoinOrder)
+        /// <param name="options">Query options.</param>
+        public CacheFieldsQueryExecutor(ICacheInternal cache, QueryOptions options)
         {
             Debug.Assert(cache != null);
+            Debug.Assert(options != null);
 
             _cache = cache;
-            _local = local;
-            _pageSize = pageSize;
-            _enableDistributedJoins = enableDistributedJoins;
-            _enforceJoinOrder = enforceJoinOrder;
+            _options = options;
         }
 
         /** <inheritdoc /> */
@@ -252,11 +237,16 @@ namespace Apache.Ignite.Linq.Impl
         /// </summary>
         internal SqlFieldsQuery GetFieldsQuery(string text, object[] args)
         {
-            return new SqlFieldsQuery(text, _local, args)
+            return new SqlFieldsQuery(text)
             {
-                EnableDistributedJoins = _enableDistributedJoins,
-                PageSize = _pageSize,
-                EnforceJoinOrder = _enforceJoinOrder
+                EnableDistributedJoins = _options.EnableDistributedJoins,
+                PageSize = _options.PageSize,
+                EnforceJoinOrder = _options.EnforceJoinOrder,
+                Timeout = _options.Timeout,
+                ReplicatedOnly = _options.ReplicatedOnly,
+                Colocated = _options.Colocated,
+                Local = _options.Local,
+                Arguments = args
             };
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
index 7372776..e271363 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
@@ -33,8 +33,7 @@ namespace Apache.Ignite.Linq.Impl
         /// <param name="queryOptions">The query options.</param>
         public CacheQueryable(ICache<TKey, TValue> cache, QueryOptions queryOptions)
             : base(new CacheFieldsQueryProvider(CacheQueryParser.Instance,
-                new CacheFieldsQueryExecutor((ICacheInternal) cache, queryOptions.Local, queryOptions.PageSize,
-                    queryOptions.EnableDistributedJoins, queryOptions.EnforceJoinOrder), 
+                new CacheFieldsQueryExecutor((ICacheInternal) cache, queryOptions), 
                 cache.Ignite, cache.GetConfiguration(), queryOptions.TableName, typeof(TValue)))
         {
             // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
index c70152e..17b3705 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
@@ -17,6 +17,7 @@
 
 namespace Apache.Ignite.Linq
 {
+    using System;
     using System.ComponentModel;
     using Apache.Ignite.Core.Cache.Configuration;
     using Apache.Ignite.Core.Cache.Query;
@@ -87,5 +88,27 @@ namespace Apache.Ignite.Linq
         ///   <c>true</c> if join order should be enforced; otherwise, <c>false</c>.
         /// </value>
         public bool EnforceJoinOrder { get; set; }
+
+        /// <summary>
+        /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+        /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+        /// </summary>
+        public TimeSpan Timeout { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether this query contains only replicated tables.
+        /// This is a hint for potentially more effective execution.
+        /// </summary>
+        public bool ReplicatedOnly { get; set; }
+
+        /// <summary>
+        /// Gets or sets a value indicating whether this query operates on colocated data.
+        /// <para />
+        /// Whenever Ignite executes a distributed query, it sends sub-queries to individual cluster members.
+        /// If you know in advance that the elements of your query selection are colocated together on the same
+        /// node and you group by colocated key (primary or affinity key), then Ignite can make significant
+        /// performance and network optimizations by grouping data on remote nodes.
+        /// </summary>
+        public bool Colocated { get; set; }
     }
 }