You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/23 12:09:49 UTC
[1/8] ignite git commit: IGNITE-5054: SQL: Simplified query
descriptor,
partially removed dependencies on 1-to-1 cache-schema dependency. This closes
#1962.
Repository: ignite
Updated Branches:
refs/heads/ignite-5075-pds aaf9f45b2 -> b86104be7
IGNITE-5054: SQL: Simplified query descriptor, partially removed dependencies on 1-to-1 cache-schema dependency. This closes #1962.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f74d51cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f74d51cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f74d51cb
Branch: refs/heads/ignite-5075-pds
Commit: f74d51cbf9a62858718c5d04b0857a3b0ef32c65
Parents: 992c976
Author: devozerov <vo...@gridgain.com>
Authored: Mon May 22 11:43:14 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon May 22 11:43:14 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 2 +
.../cache/query/GridCacheTwoStepQuery.java | 84 ++-----
.../processors/cache/query/QueryTable.java | 164 +++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 84 +++----
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../processors/query/h2/opt/GridH2Table.java | 28 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 21 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 15 +-
.../h2/twostep/GridReduceQueryExecutor.java | 233 +++++++++----------
.../h2/twostep/msg/GridH2QueryRequest.java | 13 +-
.../twostep/msg/GridH2ValueMessageFactory.java | 4 +
11 files changed, 385 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 17e4a01..753d8af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -178,6 +178,8 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ // -54 is reserved for SQL.
+
case -53:
msg = new SchemaOperationStatusMessage();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 0e31dc0..9e9a875 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache.query;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -49,13 +48,7 @@ public class GridCacheTwoStepQuery {
private String originalSql;
/** */
- private Collection<String> spaces;
-
- /** */
- private Set<String> schemas;
-
- /** */
- private Set<String> tbls;
+ private Set<QueryTable> tbls;
/** */
private boolean distributedJoins;
@@ -64,22 +57,17 @@ public class GridCacheTwoStepQuery {
private boolean skipMergeTbl;
/** */
- private List<Integer> caches;
-
- /** */
- private List<Integer> extraCaches;
+ private List<Integer> cacheIds;
/** */
private boolean local;
/**
* @param originalSql Original query SQL.
- * @param schemas Schema names in query.
* @param tbls Tables in query.
*/
- public GridCacheTwoStepQuery(String originalSql, Set<String> schemas, Set<String> tbls) {
+ public GridCacheTwoStepQuery(String originalSql, Set<QueryTable> tbls) {
this.originalSql = originalSql;
- this.schemas = schemas;
this.tbls = tbls;
}
@@ -157,8 +145,8 @@ public class GridCacheTwoStepQuery {
public boolean isReplicatedOnly() {
assert !mapQrys.isEmpty();
- for (int i = 0; i < mapQrys.size(); i++) {
- if (mapQrys.get(i).isPartitioned())
+ for (GridCacheSqlQuery mapQry : mapQrys) {
+ if (mapQry.isPartitioned())
return false;
}
@@ -187,31 +175,17 @@ public class GridCacheTwoStepQuery {
}
/**
- * @return Caches.
+ * @return Cache IDs.
*/
- public List<Integer> caches() {
- return caches;
+ public List<Integer> cacheIds() {
+ return cacheIds;
}
/**
- * @param caches Caches.
+ * @param cacheIds Cache IDs.
*/
- public void caches(List<Integer> caches) {
- this.caches = caches;
- }
-
- /**
- * @return Caches.
- */
- public List<Integer> extraCaches() {
- return extraCaches;
- }
-
- /**
- * @param extraCaches Caches.
- */
- public void extraCaches(List<Integer> extraCaches) {
- this.extraCaches = extraCaches;
+ public void cacheIds(List<Integer> cacheIds) {
+ this.cacheIds = cacheIds;
}
/**
@@ -222,27 +196,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @return Spaces.
- */
- public Collection<String> spaces() {
- return spaces;
- }
-
- /**
- * @param spaces Spaces.
- */
- public void spaces(Collection<String> spaces) {
- this.spaces = spaces;
- }
-
- /**
- * @return Schemas.
- */
- public Set<String> schemas() {
- return schemas;
- }
-
- /**
* @return {@code True} If query is local.
*/
public boolean isLocal() {
@@ -262,11 +215,9 @@ public class GridCacheTwoStepQuery {
public GridCacheTwoStepQuery copy() {
assert !explain;
- GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
+ GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, tbls);
- cp.caches = caches;
- cp.extraCaches = extraCaches;
- cp.spaces = spaces;
+ cp.cacheIds = cacheIds;
cp.rdc = rdc.copy();
cp.skipMergeTbl = skipMergeTbl;
cp.pageSize = pageSize;
@@ -279,9 +230,16 @@ public class GridCacheTwoStepQuery {
}
/**
+ * @return Nuumber of tables.
+ */
+ public int tablesCount() {
+ return tbls.size();
+ }
+
+ /**
* @return Tables.
*/
- public Set<String> tables() {
+ public Set<QueryTable> tables() {
return tbls;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
new file mode 100644
index 0000000..54f5f03
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/QueryTable.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Query table descriptor.
+ */
+public class QueryTable implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Schema. */
+ private String schema;
+
+ /** Table. */
+ private String tbl;
+
+ /**
+ * Defalt constructor.
+ */
+ public QueryTable() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param schema Schema.
+ * @param tbl Table.
+ */
+ public QueryTable(String schema, String tbl) {
+ this.schema = schema;
+ this.tbl = tbl;
+ }
+
+ /**
+ * @return Schema.
+ */
+ public String schema() {
+ return schema;
+ }
+
+ /**
+ * @return Table.
+ */
+ public String table() {
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeString("schema", schema))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeString("tbl", tbl))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ schema = reader.readString("schema");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ tbl = reader.readString("tbl");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(QueryTable.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return -54;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * (schema != null ? schema.hashCode() : 0) + (tbl != null ? tbl.hashCode() : 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj instanceof QueryTable) {
+ QueryTable other = (QueryTable)obj;
+
+ return F.eq(tbl, other.tbl) && F.eq(schema, other.schema);
+ }
+
+ return super.equals(obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(QueryTable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 599baa1..0874ddc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
@@ -289,7 +291,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private GridTimeoutProcessor.CancelableTask stmtCacheCleanupTask;
- /**
+ /*
* Command in H2 prepared statement.
*/
static {
@@ -397,7 +399,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private DdlStatementsProcessor ddlProc;
/** */
- private final ConcurrentMap<String, GridH2Table> dataTables = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<QueryTable, GridH2Table> dataTables = new ConcurrentHashMap8<>();
/** Statement cache. */
private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>();
@@ -1672,44 +1674,33 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- List<Integer> caches;
- List<Integer> extraCaches = null;
+ LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
// Setup spaces from schemas.
- if (!twoStepQry.schemas().isEmpty()) {
- Collection<String> spaces = new ArrayList<>(twoStepQry.schemas().size());
- caches = new ArrayList<>(twoStepQry.schemas().size() + 1);
- caches.add(cctx.cacheId());
+ assert twoStepQry != null;
- for (String schema : twoStepQry.schemas()) {
- String space0 = space(schema);
+ int tblCnt = twoStepQry.tablesCount();
- spaces.add(space0);
+ if (tblCnt > 0) {
+ caches0.add(cctx.cacheId());
- if (!F.eq(space0, space)) {
- int cacheId = CU.cacheId(space0);
+ for (QueryTable table : twoStepQry.tables()) {
+ String cacheName = cacheNameForSchemaAndTable(table.schema(), table.table());
- caches.add(cacheId);
+ int cacheId = CU.cacheId(cacheName);
- if (extraCaches == null)
- extraCaches = new ArrayList<>();
-
- extraCaches.add(cacheId);
- }
+ caches0.add(cacheId);
}
-
- twoStepQry.spaces(spaces);
- }
- else {
- caches = Collections.singletonList(cctx.cacheId());
- extraCaches = null;
}
+ else
+ caches0.add(cctx.cacheId());
//Prohibit usage indices with different numbers of segments in same query.
- checkCacheIndexSegmentation(caches);
+ List<Integer> cacheIds = new ArrayList<>(caches0);
- twoStepQry.caches(caches);
- twoStepQry.extraCaches(extraCaches);
+ checkCacheIndexSegmentation(cacheIds);
+
+ twoStepQry.cacheIds(cacheIds);
twoStepQry.local(qry.isLocal());
meta = meta(stmt.getMetaData());
@@ -1750,6 +1741,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Get cache for schema and table.
+ *
+ * @param schemaName Schema name.
+ * @param tblName Table name.
+ * @return Cache name.
+ */
+ private String cacheNameForSchemaAndTable(String schemaName, String tblName) {
+ // TODO: This need to be changed.
+ return space(schemaName);
+ }
+
+ /**
* @throws IllegalStateException if segmented indices used with non-segmented indices.
*/
private void checkCacheIndexSegmentation(List<Integer> caches) {
@@ -2007,15 +2010,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
addInitialUserIndex(spaceName, tbl, usrIdx);
if (dataTables.putIfAbsent(h2Tbl.identifier(), h2Tbl) != null)
- throw new IllegalStateException("Table already exists: " + h2Tbl.identifier());
- }
-
- /**
- * @param identifier Table identifier.
- * @return Data table.
- */
- public GridH2Table dataTable(String identifier) {
- return dataTables.get(identifier);
+ throw new IllegalStateException("Table already exists: " + h2Tbl.identifierString());
}
/**
@@ -2026,12 +2021,17 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Table or {@code null} if none found.
*/
public GridH2Table dataTable(String schemaName, String tblName) {
- for (GridH2Table tbl : dataTables.values()) {
- if (tbl.getSchema().getName().equals(schemaName) && tbl.getName().equals(tblName))
- return tbl;
- }
+ return dataTable(new QueryTable(schemaName, tblName));
+ }
- return null;
+ /**
+ * Find table by it's identifier.
+ *
+ * @param tbl Identifier.
+ * @return Table or {@code null} if none found.
+ */
+ public GridH2Table dataTable(QueryTable tbl) {
+ return dataTables.get(tbl);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
index 623da09..12850f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java
@@ -132,7 +132,7 @@ public abstract class GridH2IndexBase extends BaseIndex {
log = ctx.log(getClass());
- msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifier() + '.' + getName());
+ msgTopic = new IgniteBiTuple<>(GridTopic.TOPIC_QUERY, tbl.identifierString() + '.' + getName());
msgLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index a00ea90..37c03e3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -32,6 +32,7 @@ import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -47,7 +48,6 @@ import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
-import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableBase;
import org.h2.table.TableType;
@@ -58,7 +58,6 @@ import org.jsr166.LongAdder8;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
/**
@@ -104,6 +103,12 @@ public class GridH2Table extends TableBase {
/** */
private volatile boolean rebuildFromHashInProgress;
+ /** Identifier. */
+ private final QueryTable identifier;
+
+ /** Identifier as string. */
+ private final String identifierStr;
+
/**
* Creates table.
*
@@ -149,6 +154,10 @@ public class GridH2Table extends TableBase {
this.rowFactory = rowFactory;
+ identifier = new QueryTable(getSchema().getName(), getName());
+
+ identifierStr = identifier.schema() + "." + identifier.table();
+
// Indexes must be created in the end when everything is ready.
idxs = idxsFactory.createSystemIndexes(this);
@@ -221,7 +230,7 @@ public class GridH2Table extends TableBase {
if (destroyed) {
unlock(exclusive);
- throw new IllegalStateException("Table " + identifier() + " already destroyed.");
+ throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
}
if (snapshotInLock())
@@ -293,8 +302,15 @@ public class GridH2Table extends TableBase {
/**
* @return Table identifier.
*/
- public String identifier() {
- return getSchema().getName() + '.' + getName();
+ public QueryTable identifier() {
+ return identifier;
+ }
+
+ /**
+ * @return Table identifier as string.
+ */
+ public String identifierString() {
+ return identifierStr;
}
/**
@@ -352,7 +368,7 @@ public class GridH2Table extends TableBase {
*/
private void ensureNotDestroyed() {
if (destroyed)
- throw new IllegalStateException("Table " + identifier() + " already destroyed.");
+ throw new IllegalStateException("Table " + identifierString() + " already destroyed.");
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 26c6b08..9f01346 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -93,11 +94,8 @@ public class GridSqlQuerySplitter {
/** */
private int splitId = -1; // The first one will be 0.
- /** */
- private Set<String> schemas = new HashSet<>();
-
- /** */
- private Set<String> tbls = new HashSet<>();
+ /** Query tables. */
+ private Set<QueryTable> tbls = new HashSet<>();
/** */
private boolean rdcQrySimple;
@@ -224,7 +222,7 @@ public class GridSqlQuerySplitter {
}
// Setup resulting two step query and return it.
- GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.schemas, splitter.tbls);
+ GridCacheTwoStepQuery twoStepQry = new GridCacheTwoStepQuery(originalSql, splitter.tbls);
twoStepQry.reduceQuery(splitter.rdcSqlQry);
@@ -1500,15 +1498,10 @@ public class GridSqlQuerySplitter {
if (from instanceof GridSqlTable) {
GridSqlTable tbl = (GridSqlTable)from;
- String schema = tbl.schema();
-
- boolean addSchema = tbls == null;
-
- if (tbls != null)
- addSchema = tbls.add(tbl.dataTable().identifier());
+ String schemaName = tbl.dataTable().identifier().schema();
+ String tblName = tbl.dataTable().identifier().table();
- if (addSchema && schema != null && schemas != null)
- schemas.add(schema);
+ tbls.add(new QueryTable(schemaName, tblName));
// In case of alias parent we need to replace the alias itself.
if (!prntAlias)
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 1444209..43cc230 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
@@ -101,7 +102,7 @@ public class GridMapQueryExecutor {
/** */
private static final Field RESULT_FIELD;
- /**
+ /*
* Initialize.
*/
static {
@@ -514,7 +515,7 @@ public class GridMapQueryExecutor {
AffinityTopologyVersion topVer,
Map<UUID, int[]> partsMap,
int[] parts,
- Collection<String> tbls,
+ Collection<QueryTable> tbls,
int pageSize,
DistributedJoinMode distributedJoinMode,
boolean enforceJoinOrder,
@@ -567,14 +568,14 @@ public class GridMapQueryExecutor {
if (!F.isEmpty(tbls)) {
snapshotedTbls = new ArrayList<>(tbls.size());
- for (String identifier : tbls) {
- GridH2Table tbl = h2.dataTable(identifier);
+ for (QueryTable tbl : tbls) {
+ GridH2Table h2Tbl = h2.dataTable(tbl);
- Objects.requireNonNull(tbl, identifier);
+ Objects.requireNonNull(h2Tbl, tbl.toString());
- tbl.snapshotIndexes(qctx);
+ h2Tbl.snapshotIndexes(qctx);
- snapshotedTbls.add(tbl);
+ snapshotedTbls.add(h2Tbl);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3d81cb5..75914ef 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -83,7 +83,6 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Session;
@@ -348,19 +347,13 @@ public class GridReduceQueryExecutor {
}
/**
- * @param cctx Cache context for main space.
- * @param extraSpaces Extra spaces.
+ * @param cacheIds Cache IDs.
* @return {@code true} If preloading is active.
*/
- private boolean isPreloadingActive(final GridCacheContext<?, ?> cctx, List<Integer> extraSpaces) {
- if (hasMovingPartitions(cctx))
- return true;
-
- if (extraSpaces != null) {
- for (int i = 0; i < extraSpaces.size(); i++) {
- if (hasMovingPartitions(cacheContext(extraSpaces.get(i))))
- return true;
- }
+ private boolean isPreloadingActive(List<Integer> cacheIds) {
+ for (Integer cacheId : cacheIds) {
+ if (hasMovingPartitions(cacheContext(cacheId)))
+ return true;
}
return false;
@@ -439,17 +432,14 @@ public class GridReduceQueryExecutor {
/**
* @param isReplicatedOnly If we must only have replicated caches.
* @param topVer Topology version.
- * @param cctx Cache context for main space.
- * @param extraSpaces Extra spaces.
+ * @param cacheIds Participating cache IDs.
* @param parts Partitions.
* @return Data nodes or {@code null} if repartitioning started and we need to retry.
*/
- private Map<ClusterNode, IntArray> stableDataNodes(
- boolean isReplicatedOnly,
- AffinityTopologyVersion topVer,
- final GridCacheContext<?, ?> cctx,
- List<Integer> extraSpaces,
- int[] parts) {
+ private Map<ClusterNode, IntArray> stableDataNodes(boolean isReplicatedOnly, AffinityTopologyVersion topVer,
+ List<Integer> cacheIds, int[] parts) {
+ GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(0));
+
Map<ClusterNode, IntArray> map = stableDataNodesMap(topVer, cctx, parts);
Set<ClusterNode> nodes = map.keySet();
@@ -457,54 +447,53 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(map))
throw new CacheException("Failed to find data nodes for cache: " + cctx.name());
- if (!F.isEmpty(extraSpaces)) {
- for (int i = 0; i < extraSpaces.size(); i++) {
- GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+ for (int i = 1; i < cacheIds.size(); i++) {
+ GridCacheContext<?,?> extraCctx = cacheContext(cacheIds.get(i));
- String extraSpace = extraCctx.name();
+ String extraCacheName = extraCctx.name();
- if (extraCctx.isLocal())
- continue; // No consistency guaranties for local caches.
+ if (extraCctx.isLocal())
+ continue; // No consistency guaranties for local caches.
- if (isReplicatedOnly && !extraCctx.isReplicated())
- throw new CacheException("Queries running on replicated cache should not contain JOINs " +
- "with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
+ if (isReplicatedOnly && !extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with partitioned tables [replicatedCache=" + cctx.name() +
+ ", partitionedCache=" + extraCacheName + "]");
- Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
+ Set<ClusterNode> extraNodes = stableDataNodesMap(topVer, extraCctx, parts).keySet();
- if (F.isEmpty(extraNodes))
- throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
+ if (F.isEmpty(extraNodes))
+ throw new CacheException("Failed to find data nodes for cache: " + extraCacheName);
- if (isReplicatedOnly && extraCctx.isReplicated()) {
- nodes.retainAll(extraNodes);
+ if (isReplicatedOnly && extraCctx.isReplicated()) {
+ nodes.retainAll(extraNodes);
- if (map.isEmpty()) {
- if (isPreloadingActive(cctx, extraSpaces))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraSpace + "]");
- }
- }
- else if (!isReplicatedOnly && extraCctx.isReplicated()) {
- if (!extraNodes.containsAll(nodes))
- if (isPreloadingActive(cctx, extraSpaces))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraSpace + "]");
- }
- else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
- if (!extraNodes.equals(nodes))
- if (isPreloadingActive(cctx, extraSpaces))
- return null; // Retry.
- else
- throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
- ", cache2=" + extraSpace + "]");
+ if (map.isEmpty()) {
+ if (isPreloadingActive(cacheIds))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+ ", cache2=" + extraCacheName + "]");
}
- else
- throw new IllegalStateException();
}
+ else if (!isReplicatedOnly && extraCctx.isReplicated()) {
+ if (!extraNodes.containsAll(nodes))
+ if (isPreloadingActive(cacheIds))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+ ", cache2=" + extraCacheName + "]");
+ }
+ else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
+ if (!extraNodes.equals(nodes))
+ if (isPreloadingActive(cacheIds))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
+ ", cache2=" + extraCacheName + "]");
+ }
+ else
+ throw new IllegalStateException();
}
return map;
@@ -537,8 +526,6 @@ public class GridReduceQueryExecutor {
final boolean isReplicatedOnly = qry.isReplicatedOnly();
// Fail if all caches are replicated and explicit partitions are set.
-
-
for (int attempt = 0;; attempt++) {
if (attempt != 0) {
try {
@@ -561,7 +548,7 @@ public class GridReduceQueryExecutor {
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
- List<Integer> extraSpaces = qry.extraCaches();
+ List<Integer> cacheIds = qry.cacheIds();
Collection<ClusterNode> nodes = null;
@@ -572,29 +559,29 @@ public class GridReduceQueryExecutor {
Map<ClusterNode, IntArray> qryMap = null;
// Partitions are not supported for queries over all replicated caches.
- if (cctx.isReplicated() && parts != null) {
- boolean failIfReplicatedOnly = true;
+ if (parts != null) {
+ boolean replicatedOnly = true;
- for (Integer cacheId : extraSpaces) {
+ for (Integer cacheId : cacheIds) {
if (!cacheContext(cacheId).isReplicated()) {
- failIfReplicatedOnly = false;
+ replicatedOnly = false;
break;
}
}
- if (failIfReplicatedOnly)
+ if (replicatedOnly)
throw new CacheException("Partitions are not supported for replicated caches");
}
if (qry.isLocal())
nodes = singletonList(ctx.discovery().localNode());
else {
- if (isPreloadingActive(cctx, extraSpaces)) {
+ if (isPreloadingActive(cacheIds)) {
if (isReplicatedOnly)
- nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+ nodes = replicatedUnstableDataNodes(cacheIds);
else {
- partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+ partsMap = partitionedUnstableDataNodes(cacheIds);
if (partsMap != null) {
qryMap = narrowForQuery(partsMap, parts);
@@ -602,8 +589,9 @@ public class GridReduceQueryExecutor {
nodes = qryMap == null ? null : qryMap.keySet();
}
}
- } else {
- qryMap = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces, parts);
+ }
+ else {
+ qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
if (qryMap != null)
nodes = qryMap.keySet();
@@ -633,7 +621,7 @@ public class GridReduceQueryExecutor {
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
- findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism();
+ findFirstPartitioned(cacheIds).config().getQueryParallelism();
int replicatedQrysCnt = 0;
@@ -731,7 +719,7 @@ public class GridReduceQueryExecutor {
.requestId(qryReqId)
.topologyVersion(topVer)
.pageSize(r.pageSize)
- .caches(qry.caches())
+ .caches(qry.cacheIds())
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
@@ -873,22 +861,18 @@ public class GridReduceQueryExecutor {
}
/**
- * @param cctx Cache context for main space.
- * @param extraSpaces Extra spaces.
+ * @param cacheIds Cache IDs.
* @return The first partitioned cache context.
*/
- private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) {
- if (cctx.isLocal())
- throw new CacheException("Cache is LOCAL: " + cctx.name());
-
- if (!cctx.isReplicated())
- return cctx;
+ private GridCacheContext<?,?> findFirstPartitioned(List<Integer> cacheIds) {
+ for (int i = 0; i < cacheIds.size(); i++) {
+ GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i));
- for (int i = 0 ; i < extraSpaces.size(); i++) {
- GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+ if (i == 0 && cctx.isLocal())
+ throw new CacheException("Cache is LOCAL: " + cctx.name());
- if (!extraCctx.isReplicated() && !extraCctx.isLocal())
- return extraCctx;
+ if (!cctx.isReplicated() && !cctx.isLocal())
+ return cctx;
}
throw new IllegalStateException("Failed to find partitioned cache.");
@@ -997,20 +981,20 @@ public class GridReduceQueryExecutor {
/**
* Calculates data nodes for replicated caches on unstable topology.
*
- * @param cctx Cache context for main space.
- * @param extraSpaces Extra spaces.
+ * @param cacheIds Cache IDs.
* @return Collection of all data nodes owning all the caches or {@code null} for retry.
*/
- private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx,
- List<Integer> extraSpaces) {
+ private Collection<ClusterNode> replicatedUnstableDataNodes(List<Integer> cacheIds) {
int i = 0;
+ GridCacheContext<?, ?> cctx = cacheContext(cacheIds.get(i++));
+
// The main cache is allowed to be partitioned.
if (!cctx.isReplicated()) {
- assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache";
+ assert cacheIds.size() > 1: "no extra replicated caches with partitioned main cache";
// Just replace the main cache with the first one extra.
- cctx = cacheContext(extraSpaces.get(i++));
+ cctx = cacheContext(cacheIds.get(i++));
assert cctx.isReplicated(): "all the extra caches must be replicated here";
}
@@ -1020,27 +1004,26 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(nodes))
return null; // Retry.
- if (!F.isEmpty(extraSpaces)) {
- for (;i < extraSpaces.size(); i++) {
- GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+ for (;i < cacheIds.size(); i++) {
+ GridCacheContext<?, ?> extraCctx = cacheContext(cacheIds.get(i));
- if (extraCctx.isLocal())
- continue;
+ if (extraCctx.isLocal())
+ continue;
- if (!extraCctx.isReplicated())
- throw new CacheException("Queries running on replicated cache should not contain JOINs " +
- "with tables in partitioned caches [rCache=" + cctx.name() + ", pCache=" + extraCctx.name() + "]");
+ if (!extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with tables in partitioned caches [replicatedCache=" + cctx.name() + ", " +
+ "partitionedCache=" + extraCctx.name() + "]");
- Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
+ Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
- if (F.isEmpty(extraOwners))
- return null; // Retry.
+ if (F.isEmpty(extraOwners))
+ return null; // Retry.
- nodes.retainAll(extraOwners);
+ nodes.retainAll(extraOwners);
- if (nodes.isEmpty())
- return null; // Retry.
- }
+ if (nodes.isEmpty())
+ return null; // Retry.
}
return nodes;
@@ -1092,23 +1075,19 @@ public class GridReduceQueryExecutor {
/**
* Calculates partition mapping for partitioned cache on unstable topology.
*
- * @param cctx Cache context for main space.
- * @param extraSpaces Extra spaces.
+ * @param cacheIds Cache IDs.
* @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
*/
@SuppressWarnings("unchecked")
- private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx,
- List<Integer> extraSpaces) {
- assert !cctx.isLocal() : cctx.name() + " must not be LOCAL";
-
+ private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(List<Integer> cacheIds) {
// If the main cache is replicated, just replace it with the first partitioned.
- cctx = findFirstPartitioned(cctx, extraSpaces);
+ GridCacheContext<?,?> cctx = findFirstPartitioned(cacheIds);
final int partsCnt = cctx.affinity().partitions();
- if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
- for (int i = 0; i < extraSpaces.size(); i++) {
- GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+ if (cacheIds.size() > 1) { // Check correct number of partitions for partitioned caches.
+ for (Integer cacheId : cacheIds) {
+ GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
if (extraCctx.isReplicated() || extraCctx.isLocal())
continue;
@@ -1117,14 +1096,15 @@ public class GridReduceQueryExecutor {
if (parts != partsCnt)
throw new CacheException("Number of partitions must be the same for correct collocation [cache1=" +
- cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() + ", parts2=" + parts + "]");
+ cctx.name() + ", parts1=" + partsCnt + ", cache2=" + extraCctx.name() +
+ ", parts2=" + parts + "]");
}
}
Set<ClusterNode>[] partLocs = new Set[partsCnt];
// Fill partition locations for main cache.
- for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
+ for (int p = 0; p < partsCnt; p++) {
List<ClusterNode> owners = cctx.topology().owners(p);
if (F.isEmpty(owners)) {
@@ -1143,11 +1123,11 @@ public class GridReduceQueryExecutor {
partLocs[p] = new HashSet<>(owners);
}
- if (extraSpaces != null) {
+ if (cacheIds.size() > 1) {
// Find owner intersections for each participating partitioned cache partition.
// We need this for logical collocation between different partitioned caches with the same affinity.
- for (int i = 0; i < extraSpaces.size(); i++) {
- GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+ for (Integer cacheId : cacheIds) {
+ GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
// This is possible if we have replaced a replicated cache with a partitioned one earlier.
if (cctx == extraCctx)
@@ -1156,7 +1136,7 @@ public class GridReduceQueryExecutor {
if (extraCctx.isReplicated() || extraCctx.isLocal())
continue;
- for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
+ for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
List<ClusterNode> owners = extraCctx.topology().owners(p);
if (partLocs[p] == UNMAPPED_PARTS)
@@ -1166,7 +1146,8 @@ public class GridReduceQueryExecutor {
if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
return null; // Retry.
- throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
+ throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() +
+ ", part=" + p + "]");
}
if (partLocs[p] == null)
@@ -1181,8 +1162,8 @@ public class GridReduceQueryExecutor {
}
// Filter nodes where not all the replicated caches loaded.
- for (int i = 0; i < extraSpaces.size(); i++) {
- GridCacheContext<?,?> extraCctx = cacheContext(extraSpaces.get(i));
+ for (Integer cacheId : cacheIds) {
+ GridCacheContext<?, ?> extraCctx = cacheContext(cacheId);
if (!extraCctx.isReplicated())
continue;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index 17bb9f6..beb1ae2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -110,8 +111,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** */
@GridToStringInclude
- @GridDirectCollection(String.class)
- private Collection<String> tbls;
+ @GridDirectCollection(Message.class)
+ private Collection<QueryTable> tbls;
/** */
private int timeout;
@@ -173,7 +174,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
* @param tbls Tables.
* @return {@code this}.
*/
- public GridH2QueryRequest tables(Collection<String> tbls) {
+ public GridH2QueryRequest tables(Collection<QueryTable> tbls) {
this.tbls = tbls;
return this;
@@ -182,7 +183,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/**
* @return Tables.
*/
- public Collection<String> tables() {
+ public Collection<QueryTable> tables() {
return tbls;
}
@@ -434,7 +435,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 7:
- if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
+ if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@ -527,7 +528,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 7:
- tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
+ tbls = reader.readCollection("tbls", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f74d51cb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
index 3a825f7..18b1afb 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2ValueMessageFactory.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Iterator;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -108,6 +109,9 @@ public class GridH2ValueMessageFactory implements MessageFactory {
case -35:
return new GridH2RowRangeBounds();
+
+ case -54:
+ return new QueryTable();
}
return null;
[3/8] ignite git commit: ignite-4763 doSetRollbackOnly method to be
implemented in the SpringTransactionManager
Posted by sb...@apache.org.
ignite-4763 doSetRollbackOnly method to be implemented in the SpringTransactionManager
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c4bb996d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c4bb996d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c4bb996d
Branch: refs/heads/ignite-5075-pds
Commit: c4bb996d86f80f1009d6efb85f5c659048bb0c48
Parents: 561d2cf
Author: NSAmelchev <ns...@gmail.com>
Authored: Wed Apr 26 16:17:19 2017 +0300
Committer: agura <ag...@apache.org>
Committed: Mon May 22 16:10:40 2017 +0300
----------------------------------------------------------------------
.../spring/SpringTransactionManager.java | 13 ++++++
.../GridSpringTransactionManagerSelfTest.java | 45 ++++++++++++++++++++
2 files changed, 58 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c4bb996d/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
index 2fe8aad..d09656e 100644
--- a/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
+++ b/modules/spring/src/main/java/org/apache/ignite/transactions/spring/SpringTransactionManager.java
@@ -446,6 +446,19 @@ public class SpringTransactionManager extends AbstractPlatformTransactionManager
}
/** {@inheritDoc} */
+ @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) throws TransactionException {
+ IgniteTransactionObject txObj = (IgniteTransactionObject)status.getTransaction();
+ Transaction tx = txObj.getTransactionHolder().getTransaction();
+
+ assert tx != null;
+
+ if (status.isDebug() && log.isDebugEnabled())
+ log.debug("Setting Ignite transaction rollback-only: " + tx);
+
+ tx.setRollbackOnly();
+ }
+
+ /** {@inheritDoc} */
@Override protected void doCleanupAfterCompletion(Object transaction) {
IgniteTransactionObject txObj = (IgniteTransactionObject)transaction;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c4bb996d/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
index 8640c0b..e68ddb4 100644
--- a/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/transactions/spring/GridSpringTransactionManagerSelfTest.java
@@ -25,10 +25,14 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericXmlApplicationContext;
import org.springframework.transaction.IllegalTransactionStateException;
import org.springframework.transaction.InvalidIsolationLevelException;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionTemplate;
/**
* Spring transaction test.
@@ -144,4 +148,45 @@ public class GridSpringTransactionManagerSelfTest extends GridCommonAbstractTest
assertEquals(0, c.size());
}
+
+ /**
+ * @throws Exception If test failed.
+ */
+ public void testDoSetRollbackOnlyInExistingTransaction() throws Exception {
+ final IgniteCache<Integer, String> c = grid().cache(CACHE_NAME);
+
+ SpringTransactionManager mngr = new SpringTransactionManager();
+ mngr.setIgniteInstanceName(grid().name());
+ mngr.afterPropertiesSet();
+
+ TransactionTemplate txTmpl = new TransactionTemplate(mngr);
+
+ try {
+ txTmpl.execute(new TransactionCallback<Object>() {
+ @Override public Object doInTransaction(TransactionStatus status) {
+ c.put(1, "1");
+
+ Transaction tx = grid().transactions().tx();
+
+ assertFalse(tx.isRollbackOnly());
+
+ try {
+ service.putWithError(c, 1_000);
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ assertTrue(tx.isRollbackOnly());
+
+ return null;
+ }
+ });
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+
+ assertEquals(0, c.size());
+ }
}
[5/8] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6e8c8311
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6e8c8311
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6e8c8311
Branch: refs/heads/ignite-5075-pds
Commit: 6e8c83115b8d1fdc2d0c59363369015477d63aaa
Parents: 0e49e91
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 13:37:22 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 13:37:22 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/configuration/CacheConfiguration.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6e8c8311/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 5f44811..ff51a63 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -461,8 +461,10 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return grpName;
}
- public void setGroupName(String groupName) {
+ public CacheConfiguration<K, V> setGroupName(String groupName) {
this.grpName = groupName;
+
+ return this;
}
/**
[8/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-5075' into ignite-5075-pds
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-5075' into ignite-5075-pds
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b86104be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b86104be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b86104be
Branch: refs/heads/ignite-5075-pds
Commit: b86104be7f0c26bfda394d448b3276e2d5eef803
Parents: aaf9f45 7fc0168
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 15:08:34 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 15:08:34 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 15 +-
.../ignite/tests/IgnitePersistentStoreTest.java | 62 ++++-
.../persistence/loadall_blob/ignite-config.xml | 90 +++++++
.../loadall_blob/persistence-settings.xml | 29 +++
.../store/jdbc/CacheAbstractJdbcStore.java | 6 +-
.../configuration/CacheConfiguration.java | 4 +-
.../communication/GridIoMessageFactory.java | 2 +
.../cache/CacheGroupInfrastructure.java | 172 +++++++-------
.../platform/cache/PlatformCache.java | 28 ++-
.../cache/query/GridCacheTwoStepQuery.java | 84 ++-----
.../processors/cache/query/QueryTable.java | 164 +++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 84 +++----
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../processors/query/h2/opt/GridH2Table.java | 28 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 21 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 15 +-
.../h2/twostep/GridReduceQueryExecutor.java | 233 +++++++++----------
.../h2/twostep/msg/GridH2QueryRequest.java | 13 +-
.../twostep/msg/GridH2ValueMessageFactory.java | 4 +
.../core/include/ignite/cache/query/query_sql.h | 2 +
.../ignite/cache/query/query_sql_fields.h | 3 +
.../Cache/Query/CacheLinqTest.cs | 36 ++-
.../Cache/Query/CacheQueriesTest.cs | 60 ++++-
.../Cache/Query/SqlFieldsQuery.cs | 29 ++-
.../Apache.Ignite.Core/Cache/Query/SqlQuery.cs | 31 +++
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 3 +
.../Apache.Ignite.Linq/CacheExtensions.cs | 10 +
.../Impl/CacheFieldsQueryExecutor.cs | 42 ++--
.../Apache.Ignite.Linq/Impl/CacheQueryable.cs | 3 +-
.../dotnet/Apache.Ignite.Linq/QueryOptions.cs | 23 ++
.../spring/SpringTransactionManager.java | 13 ++
.../GridSpringTransactionManagerSelfTest.java | 45 ++++
32 files changed, 958 insertions(+), 398 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index e717670,7c07d2d..34feeaf
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@@ -20,8 -20,8 +20,9 @@@ package org.apache.ignite.internal.proc
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+ import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b86104be/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
[6/8] ignite git commit: ignite-5075
Posted by sb...@apache.org.
ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19ed098e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19ed098e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19ed098e
Branch: refs/heads/ignite-5075-pds
Commit: 19ed098ee02667c2b6e359f6f0c39cead8b01569
Parents: 6e8c831
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 14:44:42 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 14:44:42 2017 +0300
----------------------------------------------------------------------
.../cache/CacheGroupInfrastructure.java | 172 ++++++++++---------
1 file changed, 90 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19ed098e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 6d61ab1..7c07d2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -97,7 +98,7 @@ public class CacheGroupInfrastructure {
private boolean needsRecovery;
/** */
- private final List<GridCacheContext> caches;
+ private volatile List<GridCacheContext> caches;
/** */
private final IgniteLogger log;
@@ -235,14 +236,14 @@ public class CacheGroupInfrastructure {
* @return {@code True} if group contains cache with given name.
*/
public boolean hasCache(String cacheName) {
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- if (caches.get(i).name().equals(cacheName))
- return true;
- }
+ List<GridCacheContext> caches = this.caches;
- return false;
+ for (int i = 0; i < caches.size(); i++) {
+ if (caches.get(i).name().equals(cacheName))
+ return true;
}
+
+ return false;
}
/**
@@ -252,51 +253,58 @@ public class CacheGroupInfrastructure {
assert cacheType.userCache() == cctx.userCache() : cctx.name();
assert grpId == cctx.groupId() : cctx.name();
- synchronized (caches) {
- assert sharedGroup() || caches.isEmpty();
+ ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
- boolean add = caches.add(cctx);
+ assert sharedGroup() || caches.isEmpty();
- assert add : cctx.name();
- }
+ boolean add = caches.add(cctx);
+
+ assert add : cctx.name();
+
+ this.caches = caches;
}
/**
* @param cctx Cache context.
*/
private void removeCacheContext(GridCacheContext cctx) {
- synchronized (caches) {
- if (caches.contains(cctx)) { // It is possible cache is not added in case of errors on cache start.
- assert sharedGroup() || caches.size() == 1 : caches.size();
+ ArrayList<GridCacheContext> caches = new ArrayList<>(this.caches);
+
+ // It is possible cache was not added in case of errors on cache start.
+ for (Iterator<GridCacheContext> it = caches.iterator(); it.hasNext();) {
+ GridCacheContext next = it.next();
- boolean rmv = caches.remove(cctx);
+ if (next == cctx) {
+ assert sharedGroup() || caches.size() == 1 : caches.size();
- assert rmv : cctx.name();
+ it.remove();
}
}
+
+ this.caches = caches;
}
/**
* @return Cache context if group contains single cache.
*/
public GridCacheContext singleCacheContext() {
- synchronized (caches) {
- assert !sharedGroup() && caches.size() == 1;
+ List<GridCacheContext> caches = this.caches;
- return caches.get(0);
- }
+ assert !sharedGroup() && caches.size() == 1;
+
+ return caches.get(0);
}
/**
*
*/
public void unwindUndeploys() {
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = caches.get(i);
+ List<GridCacheContext> caches = this.caches;
- cctx.deploy().unwind(cctx);
- }
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = caches.get(i);
+
+ cctx.deploy().unwind(cctx);
}
}
@@ -326,20 +334,20 @@ public class CacheGroupInfrastructure {
if (!eventRecordable(type))
LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = caches.get(i);
-
- if (cctx.recordEvent(type)) {
- cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
- cctx.localNode(),
- "Cache rebalancing event.",
- type,
- part,
- discoNode,
- discoType,
- discoTs));
- }
+ List<GridCacheContext> caches = this.caches;
+
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = caches.get(i);
+
+ if (cctx.recordEvent(type)) {
+ cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+ cctx.localNode(),
+ "Cache rebalancing event.",
+ type,
+ part,
+ discoNode,
+ discoType,
+ discoTs));
}
}
}
@@ -353,19 +361,19 @@ public class CacheGroupInfrastructure {
LT.warn(log, "Added event without checking if event is recordable: " +
U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = caches.get(i);
+ List<GridCacheContext> caches = this.caches;
- cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
- cctx.localNode(),
- "Cache unloading event.",
- EVT_CACHE_REBALANCE_PART_UNLOADED,
- part,
- null,
- 0,
- 0));
- }
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = caches.get(i);
+
+ cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+ cctx.localNode(),
+ "Cache unloading event.",
+ EVT_CACHE_REBALANCE_PART_UNLOADED,
+ part,
+ null,
+ 0,
+ 0));
}
}
@@ -391,25 +399,25 @@ public class CacheGroupInfrastructure {
boolean hasOldVal,
boolean keepBinary
) {
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = caches.get(i);
-
- cctx.events().addEvent(part,
- key,
- evtNodeId,
- (IgniteUuid)null,
- null,
- type,
- newVal,
- hasNewVal,
- oldVal,
- hasOldVal,
- null,
- null,
- null,
- keepBinary);
- }
+ List<GridCacheContext> caches = this.caches;
+
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = caches.get(i);
+
+ cctx.events().addEvent(part,
+ key,
+ evtNodeId,
+ (IgniteUuid)null,
+ null,
+ type,
+ newVal,
+ hasNewVal,
+ oldVal,
+ hasOldVal,
+ null,
+ null,
+ null,
+ keepBinary);
}
}
@@ -620,26 +628,26 @@ public class CacheGroupInfrastructure {
* @return {@code True} if group contains caches.
*/
boolean hasCaches() {
- synchronized (caches) {
- return !caches.isEmpty();
- }
+ List<GridCacheContext> caches = this.caches;
+
+ return !caches.isEmpty();
}
/**
* @param part Partition ID.
*/
public void onPartitionEvicted(int part) {
- synchronized (caches) {
- for (int i = 0; i < caches.size(); i++) {
- GridCacheContext cctx = caches.get(i);
+ List<GridCacheContext> caches = this.caches;
- if (cctx.isDrEnabled())
- cctx.dr().partitionEvicted(part);
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext cctx = caches.get(i);
- cctx.continuousQueries().onPartitionEvicted(part);
+ if (cctx.isDrEnabled())
+ cctx.dr().partitionEvicted(part);
- cctx.dataStructures().onPartitionEvicted(part);
- }
+ cctx.continuousQueries().onPartitionEvicted(part);
+
+ cctx.dataStructures().onPartitionEvicted(part);
}
}
[7/8] ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-5075
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7fc01683
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7fc01683
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7fc01683
Branch: refs/heads/ignite-5075-pds
Commit: 7fc01683a7e0995d41ad8d900767d6e48443d8b2
Parents: 19ed098 ca94cf3
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 23 15:05:52 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 23 15:05:52 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 15 +-
.../ignite/tests/IgnitePersistentStoreTest.java | 62 ++++-
.../persistence/loadall_blob/ignite-config.xml | 90 +++++++
.../loadall_blob/persistence-settings.xml | 29 +++
.../store/jdbc/CacheAbstractJdbcStore.java | 6 +-
.../managers/communication/GridIoManager.java | 30 ++-
.../communication/GridIoMessageFactory.java | 2 +
.../platform/cache/PlatformCache.java | 28 ++-
.../cache/query/GridCacheTwoStepQuery.java | 84 ++-----
.../processors/cache/query/QueryTable.java | 164 +++++++++++++
.../processors/query/h2/IgniteH2Indexing.java | 84 +++----
.../query/h2/opt/GridH2IndexBase.java | 2 +-
.../processors/query/h2/opt/GridH2Table.java | 28 ++-
.../query/h2/sql/GridSqlQuerySplitter.java | 21 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 15 +-
.../h2/twostep/GridReduceQueryExecutor.java | 233 +++++++++----------
.../h2/twostep/msg/GridH2QueryRequest.java | 13 +-
.../twostep/msg/GridH2ValueMessageFactory.java | 4 +
.../core/include/ignite/cache/query/query_sql.h | 2 +
.../ignite/cache/query/query_sql_fields.h | 3 +
.../Cache/Query/CacheLinqTest.cs | 36 ++-
.../Cache/Query/CacheQueriesTest.cs | 60 ++++-
.../Cache/Query/SqlFieldsQuery.cs | 29 ++-
.../Apache.Ignite.Core/Cache/Query/SqlQuery.cs | 31 +++
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 3 +
.../Apache.Ignite.Linq/CacheExtensions.cs | 10 +
.../Impl/CacheFieldsQueryExecutor.cs | 42 ++--
.../Apache.Ignite.Linq/Impl/CacheQueryable.cs | 3 +-
.../dotnet/Apache.Ignite.Linq/QueryOptions.cs | 23 ++
.../spring/SpringTransactionManager.java | 13 ++
.../GridSpringTransactionManagerSelfTest.java | 45 ++++
31 files changed, 892 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fc01683/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7fc01683/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8286b45,75914ef..bba5fca
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@ -1163,10 -1143,11 +1143,11 @@@ public class GridReduceQueryExecutor
continue; // Skip unmapped partitions.
if (F.isEmpty(owners)) {
- if (!F.isEmpty(dataNodes(extraCctx.name(), NONE)))
+ if (!F.isEmpty(dataNodes(extraCctx.groupId(), NONE)))
return null; // Retry.
- throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() + ", part=" + p + "]");
+ throw new CacheException("Failed to find data nodes [cache=" + extraCctx.name() +
+ ", part=" + p + "]");
}
if (partLocs[p] == null)
[2/8] ignite git commit: Fixed "IGNITE-4205 CassandraCacheStore
should start IgniteThread threads in loadCache() method"
Posted by sb...@apache.org.
Fixed "IGNITE-4205 CassandraCacheStore should start IgniteThread threads in loadCache() method"
Signed-off-by: nikolay_tikhonov <nt...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/561d2cf0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/561d2cf0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/561d2cf0
Branch: refs/heads/ignite-5075-pds
Commit: 561d2cf048be59524acfbe2ac064d8b633b99c37
Parents: f74d51c
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon May 22 14:30:30 2017 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Mon May 22 14:30:30 2017 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 15 +++-
.../ignite/tests/IgnitePersistentStoreTest.java | 62 +++++++++++++-
.../persistence/loadall_blob/ignite-config.xml | 90 ++++++++++++++++++++
.../loadall_blob/persistence-settings.xml | 29 +++++++
.../store/jdbc/CacheAbstractJdbcStore.java | 6 +-
5 files changed, 198 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 98c8b40..b438946 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -23,16 +23,17 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Statement;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
@@ -52,7 +53,9 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
/**
* Implementation of {@link CacheStore} backed by Cassandra database.
@@ -64,6 +67,14 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
/** Buffer to store mutations performed withing transaction. */
private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
+ /** Thread name. */
+ private static final String CACHE_LOADER_THREAD_NAME = "cassandra-cache-loader";
+
+ /** Auto-injected ignite instance. */
+ @SuppressWarnings("unused")
+ @IgniteInstanceResource
+ private Ignite ignite;
+
/** Auto-injected store session. */
@SuppressWarnings("unused")
@CacheStoreSessionResource
@@ -109,7 +120,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
Collection<Future<?>> futs = new ArrayList<>(args.length);
try {
- pool = Executors.newFixedThreadPool(maxPoolSize);
+ pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
CassandraSession ses = getCassandraSession();
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index c8c7139..feccb24 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -25,9 +25,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.tests.pojos.Person;
@@ -42,9 +44,9 @@ import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.apache.log4j.Logger;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.Assert;
import org.springframework.core.io.ClassPathResource;
/**
@@ -247,6 +249,34 @@ public class IgnitePersistentStoreTest {
/** */
@Test
+ public void blobBinaryLoadCacheTest() {
+ Ignition.stopAll(true);
+
+ try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+ IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+ assert ignite.configuration().getMarshaller() instanceof BinaryMarshaller;
+
+ personCache.put(1L, new PojoPerson(1, "name"));
+
+ assert personCache.withKeepBinary().get(1L) instanceof BinaryObject;
+ }
+
+ Ignition.stopAll(true);
+
+ try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml")) {
+ IgniteCache<Long, PojoPerson> personCache = ignite.getOrCreateCache("cache2");
+
+ personCache.loadCache(null, null);
+
+ PojoPerson person = personCache.get(1L);
+
+ LOGGER.info("loadCache tests passed");
+ }
+ }
+
+ /** */
+ @Test
public void pojoStrategyTest() {
Ignition.stopAll(true);
@@ -673,4 +703,34 @@ public class IgnitePersistentStoreTest {
" concurrency and " + isolation + " isolation level");
LOGGER.info("-----------------------------------------------------------------------------------");
}
+
+ /** */
+ public static class PojoPerson {
+ /** */
+ private int id;
+
+ /** */
+ private String name;
+
+ /** */
+ public PojoPerson() {
+ // No-op.
+ }
+
+ /** */
+ public PojoPerson(int id, String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /** */
+ public int getId() {
+ return id;
+ }
+
+ /** */
+ public String getName() {
+ return name;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
new file mode 100644
index 0000000..115e263
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/ignite-config.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd">
+
+ <!-- Cassandra connection settings -->
+ <import resource="classpath:org/apache/ignite/tests/cassandra/connection-settings.xml"/>
+
+ <!-- Persistence settings for 'cache2' -->
+ <bean id="cache2_persistence_settings"
+ class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+ <constructor-arg type="org.springframework.core.io.Resource"
+ value="classpath:org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml"/>
+ </bean>
+
+ <!-- Ignite configuration -->
+ <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+
+ <property name="marshaller">
+ <bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
+ </property>
+
+ <property name="binaryConfiguration">
+ <bean class="org.apache.ignite.configuration.BinaryConfiguration">
+ <property name="compactFooter" value="false"/>
+ </bean>
+ </property>
+
+ <property name="cacheConfiguration">
+ <list>
+ <!-- Configuring persistence for "cache2" cache -->
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="name" value="cache2"/>
+ <property name="readThrough" value="true"/>
+ <property name="writeThrough" value="true"/>
+ <property name="storeKeepBinary" value="true"/>
+ <property name="cacheStoreFactory">
+ <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+ <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+ <property name="persistenceSettingsBean" value="cache2_persistence_settings"/>
+ </bean>
+ </property>
+ </bean>
+ </list>
+ </property>
+
+ <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+ <property name="discoverySpi">
+ <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <!--
+ Ignite provides several options for automatic discovery that can be used
+ instead os static IP based discovery. For information on all options refer
+ to our documentation: http://apacheignite.readme.io/docs/cluster-config
+ -->
+ <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
+ <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">-->
+ <property name="addresses">
+ <list>
+ <!-- In distributed environment, replace with actual host IP address. -->
+ <value>127.0.0.1:47500..47509</value>
+ </list>
+ </property>
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
new file mode 100644
index 0000000..e872201
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/loadall_blob/persistence-settings.xml
@@ -0,0 +1,29 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<persistence keyspace="test1" table="blob_test3">
+ <!-- By default Java standard serialization is going to be used -->
+ <keyPersistence class="java.lang.Long"
+ strategy="BLOB"
+ column="key"/>
+
+ <!-- Kryo serialization specified to be used -->
+ <valuePersistence class="org.apache.ignite.tests.pojos.Person"
+ strategy="BLOB"
+ serializer="org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer"
+ column="value"/>
+</persistence>
http://git-wip-us.apache.org/repos/asf/ignite/blob/561d2cf0/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
index 46e9022..b1ec38d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheAbstractJdbcStore.java
@@ -69,6 +69,7 @@ import org.apache.ignite.lifecycle.LifecycleAware;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.thread.IgniteThreadFactory;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -121,6 +122,9 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
/** Connection attribute property name. */
protected static final String ATTR_CONN_PROP = "JDBC_STORE_CONNECTION";
+ /** Thread name. */
+ private static final String CACHE_LOADER_THREAD_NAME = "jdbc-cache-loader";
+
/** Built in Java types names. */
protected static final Collection<String> BUILT_IN_TYPES = new HashSet<>();
@@ -680,7 +684,7 @@ public abstract class CacheAbstractJdbcStore<K, V> implements CacheStore<K, V>,
String cacheName = session().cacheName();
try {
- pool = Executors.newFixedThreadPool(maxPoolSize);
+ pool = Executors.newFixedThreadPool(maxPoolSize, new IgniteThreadFactory(ignite.name(), CACHE_LOADER_THREAD_NAME));
Collection<Future<?>> futs = new ArrayList<>();
[4/8] ignite git commit: IGNITE-5257 .NET: SQL query timeouts
Posted by sb...@apache.org.
IGNITE-5257 .NET: SQL query timeouts
This closes #1985
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca94cf3d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca94cf3d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca94cf3d
Branch: refs/heads/ignite-5075-pds
Commit: ca94cf3d6c708218ef22aa40c07c436c75360bc6
Parents: c4bb996
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Mon May 22 17:45:10 2017 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Mon May 22 17:45:10 2017 +0300
----------------------------------------------------------------------
.../platform/cache/PlatformCache.java | 28 +++++++--
.../core/include/ignite/cache/query/query_sql.h | 2 +
.../ignite/cache/query/query_sql_fields.h | 3 +
.../Cache/Query/CacheLinqTest.cs | 36 ++++++++++--
.../Cache/Query/CacheQueriesTest.cs | 60 +++++++++++++++++---
.../Cache/Query/SqlFieldsQuery.cs | 29 +++++++++-
.../Apache.Ignite.Core/Cache/Query/SqlQuery.cs | 31 ++++++++++
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 3 +
.../Apache.Ignite.Linq/CacheExtensions.cs | 10 ++++
.../Impl/CacheFieldsQueryExecutor.cs | 42 ++++++--------
.../Apache.Ignite.Linq/Impl/CacheQueryable.cs | 3 +-
.../dotnet/Apache.Ignite.Linq/QueryOptions.cs | 23 ++++++++
12 files changed, 224 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index c61b75e..13a8ca1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -1282,8 +1282,16 @@ public class PlatformCache extends PlatformAbstractTarget {
Object[] args = readQueryArgs(reader);
boolean distrJoins = reader.readBoolean();
-
- return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc).setDistributedJoins(distrJoins);
+ int timeout = reader.readInt();
+ boolean replicated = reader.readBoolean();
+
+ return new SqlQuery(typ, sql)
+ .setPageSize(pageSize)
+ .setArgs(args)
+ .setLocal(loc)
+ .setDistributedJoins(distrJoins)
+ .setTimeout(timeout, TimeUnit.MILLISECONDS)
+ .setReplicatedOnly(replicated);
}
/**
@@ -1301,9 +1309,19 @@ public class PlatformCache extends PlatformAbstractTarget {
boolean distrJoins = reader.readBoolean();
boolean enforceJoinOrder = reader.readBoolean();
-
- return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc)
- .setDistributedJoins(distrJoins).setEnforceJoinOrder(enforceJoinOrder);
+ int timeout = reader.readInt();
+ boolean replicated = reader.readBoolean();
+ boolean collocated = reader.readBoolean();
+
+ return new SqlFieldsQuery(sql)
+ .setPageSize(pageSize)
+ .setArgs(args)
+ .setLocal(loc)
+ .setDistributedJoins(distrJoins)
+ .setEnforceJoinOrder(enforceJoinOrder)
+ .setTimeout(timeout, TimeUnit.MILLISECONDS)
+ .setReplicatedOnly(replicated)
+ .setCollocated(collocated);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
index d733476..eb0606a 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql.h
@@ -272,6 +272,8 @@ namespace ignite
(*it)->Write(writer);
writer.WriteBool(distributedJoins);
+ writer.WriteInt32(0); // Timeout, ms
+ writer.WriteBool(false); // ReplicatedOnly
}
private:
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
index 954cf43..db26fc4 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/query_sql_fields.h
@@ -295,6 +295,9 @@ namespace ignite
writer.WriteBool(distributedJoins);
writer.WriteBool(enforceJoinOrder);
+ writer.WriteInt32(0); // Timeout, ms
+ writer.WriteBool(false); // ReplicatedOnly
+ writer.WriteBool(false); // Colocated
}
private:
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
index 265a149..cb3fece 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheLinqTest.cs
@@ -1273,7 +1273,10 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
{
Local = true,
PageSize = 999,
- EnforceJoinOrder = true
+ EnforceJoinOrder = true,
+ Timeout = TimeSpan.FromSeconds(2.5),
+ ReplicatedOnly = true,
+ Colocated = true
}).Where(x => x.Key > 10);
Assert.AreEqual(cache.Name, query.CacheName);
@@ -1288,12 +1291,16 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
Assert.AreEqual(999, fq.PageSize);
Assert.IsFalse(fq.EnableDistributedJoins);
Assert.IsTrue(fq.EnforceJoinOrder);
+ Assert.IsTrue(fq.ReplicatedOnly);
+ Assert.IsTrue(fq.Colocated);
+ Assert.AreEqual(TimeSpan.FromSeconds(2.5), fq.Timeout);
var str = query.ToString();
Assert.AreEqual("CacheQueryable [CacheName=person_org, TableName=Person, Query=SqlFieldsQuery " +
"[Sql=select _T0._key, _T0._val from \"person_org\".Person as _T0 where " +
"(_T0._key > ?), Arguments=[10], " +
- "Local=True, PageSize=999, EnableDistributedJoins=False, EnforceJoinOrder=True]]", str);
+ "Local=True, PageSize=999, EnableDistributedJoins=False, EnforceJoinOrder=True, " +
+ "Timeout=00:00:02.5000000, ReplicatedOnly=True, Colocated=True]]", str);
// Check fields query
var fieldsQuery = (ICacheQueryable) cache.AsCacheQueryable().Select(x => x.Value.Name);
@@ -1311,7 +1318,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
str = fieldsQuery.ToString();
Assert.AreEqual("CacheQueryable [CacheName=person_org, TableName=Person, Query=SqlFieldsQuery " +
"[Sql=select _T0.Name from \"person_org\".Person as _T0, Arguments=[], Local=False, " +
- "PageSize=1024, EnableDistributedJoins=False, EnforceJoinOrder=False]]", str);
+ "PageSize=1024, EnableDistributedJoins=False, EnforceJoinOrder=False, " +
+ "Timeout=00:00:00, ReplicatedOnly=False, Colocated=False]]", str);
// Check distributed joins flag propagation
var distrQuery = cache.AsCacheQueryable(new QueryOptions {EnableDistributedJoins = true})
@@ -1326,7 +1334,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
"[Sql=select _T0._key, _T0._val from \"person_org\".Person as _T0 where " +
"(((_T0._key > ?) and (_T0.age1 > ?)) " +
"and (_T0.Name like \'%\' || ? || \'%\') ), Arguments=[10, 20, x], Local=False, " +
- "PageSize=1024, EnableDistributedJoins=True, EnforceJoinOrder=False]]", str);
+ "PageSize=1024, EnableDistributedJoins=True, EnforceJoinOrder=False, " +
+ "Timeout=00:00:00, ReplicatedOnly=False, Colocated=False]]", str);
}
/// <summary>
@@ -1396,6 +1405,25 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
}
/// <summary>
+ /// Tests the query timeout.
+ /// </summary>
+ [Test]
+ public void TestTimeout()
+ {
+ var persons = GetPersonCache().AsCacheQueryable(new QueryOptions
+ {
+ Timeout = TimeSpan.FromMilliseconds(1),
+ EnableDistributedJoins = true
+ });
+
+ // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+ var ex = Assert.Throws<CacheException>(() =>
+ persons.SelectMany(p => GetRoleCache().AsCacheQueryable()).ToArray());
+
+ Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+ }
+
+ /// <summary>
/// Gets the person cache.
/// </summary>
/// <returns></returns>
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
index 01277e1..60d2fdf 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Query/CacheQueriesTest.cs
@@ -105,8 +105,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
for (int i = 0; i < GridCnt; i++)
{
- for (int j = 0; j < MaxItemCnt; j++)
- cache.Remove(j);
+ cache.Clear();
Assert.IsTrue(cache.IsEmpty());
}
@@ -352,9 +351,15 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
// 2. Validate results.
var qry = new SqlQuery(typeof(QueryPerson), "age < 50", loc)
{
- EnableDistributedJoins = distrJoin
+ EnableDistributedJoins = distrJoin,
+ ReplicatedOnly = false,
+ Timeout = TimeSpan.FromSeconds(3)
};
+ Assert.AreEqual(string.Format("SqlQuery [Sql=age < 50, Arguments=[], Local={0}, " +
+ "PageSize=1024, EnableDistributedJoins={1}, Timeout={2}, " +
+ "ReplicatedOnly=False]", loc, distrJoin, qry.Timeout), qry.ToString());
+
ValidateQueryResults(cache, qry, exp, keepBinary);
}
@@ -376,7 +381,10 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
var qry = new SqlFieldsQuery("SELECT name, age FROM QueryPerson WHERE age < 50", loc)
{
EnableDistributedJoins = distrJoin,
- EnforceJoinOrder = enforceJoinOrder
+ EnforceJoinOrder = enforceJoinOrder,
+ Colocated = !distrJoin,
+ ReplicatedOnly = false,
+ Timeout = TimeSpan.FromSeconds(2)
};
using (IQueryCursor<IList> cursor = cache.QueryFields(qry))
@@ -673,6 +681,44 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
}
/// <summary>
+ /// Tests query timeouts.
+ /// </summary>
+ [Test]
+ public void TestSqlQueryTimeout()
+ {
+ var cache = Cache();
+ PopulateCache(cache, false, 20000, x => true);
+
+ var sqlQry = new SqlQuery(typeof(QueryPerson), "WHERE age < 500 AND name like '%1%'")
+ {
+ Timeout = TimeSpan.FromMilliseconds(2)
+ };
+
+ // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+ var ex = Assert.Throws<CacheException>(() => cache.Query(sqlQry).ToArray());
+ Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+ }
+
+ /// <summary>
+ /// Tests fields query timeouts.
+ /// </summary>
+ [Test]
+ public void TestSqlFieldsQueryTimeout()
+ {
+ var cache = Cache();
+ PopulateCache(cache, false, 20000, x => true);
+
+ var fieldsQry = new SqlFieldsQuery("SELECT * FROM QueryPerson WHERE age < 5000 AND name like '%0%'")
+ {
+ Timeout = TimeSpan.FromMilliseconds(3)
+ };
+
+ // ReSharper disable once ReturnValueOfPureMethodIsNotUsed
+ var ex = Assert.Throws<CacheException>(() => cache.QueryFields(fieldsQry).ToArray());
+ Assert.IsTrue(ex.ToString().Contains("QueryCancelledException: The query was cancelled while executing."));
+ }
+
+ /// <summary>
/// Validates the query results.
/// </summary>
/// <param name="cache">Cache.</param>
@@ -820,7 +866,7 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
for (var i = 0; i < cnt; i++)
{
- var val = rand.Next(100);
+ var val = rand.Next(cnt);
cache.Put(val, new QueryPerson(val.ToString(), val));
@@ -845,8 +891,8 @@ namespace Apache.Ignite.Core.Tests.Cache.Query
public QueryPerson(string name, int age)
{
Name = name;
- Age = age;
- Birthday = DateTime.UtcNow.AddYears(-age);
+ Age = age % 2000;
+ Birthday = DateTime.UtcNow.AddYears(-Age);
}
/// <summary>
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
index aab2bfe..4809574 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlFieldsQuery.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Core.Cache.Query
{
+ using System;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
@@ -105,6 +106,28 @@ namespace Apache.Ignite.Core.Cache.Query
public bool EnforceJoinOrder { get; set; }
/// <summary>
+ /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+ /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+ /// </summary>
+ public TimeSpan Timeout { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this query contains only replicated tables.
+ /// This is a hint for potentially more effective execution.
+ /// </summary>
+ public bool ReplicatedOnly { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this query operates on colocated data.
+ /// <para />
+ /// Whenever Ignite executes a distributed query, it sends sub-queries to individual cluster members.
+ /// If you know in advance that the elements of your query selection are colocated together on the same
+ /// node and you group by colocated key (primary or affinity key), then Ignite can make significant
+ /// performance and network optimizations by grouping data on remote nodes.
+ /// </summary>
+ public bool Colocated { get; set; }
+
+ /// <summary>
/// Returns a <see cref="string" /> that represents this instance.
/// </summary>
/// <returns>
@@ -115,8 +138,10 @@ namespace Apache.Ignite.Core.Cache.Query
var args = string.Join(", ", Arguments.Select(x => x == null ? "null" : x.ToString()));
return string.Format("SqlFieldsQuery [Sql={0}, Arguments=[{1}], Local={2}, PageSize={3}, " +
- "EnableDistributedJoins={4}, EnforceJoinOrder={5}]", Sql, args, Local,
- PageSize, EnableDistributedJoins, EnforceJoinOrder);
+ "EnableDistributedJoins={4}, EnforceJoinOrder={5}, Timeout={6}, ReplicatedOnly={7}" +
+ ", Colocated={8}]", Sql, args, Local,
+ PageSize, EnableDistributedJoins, EnforceJoinOrder, Timeout, ReplicatedOnly,
+ Colocated);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
index 70e08b2..7d8e8fb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Query/SqlQuery.cs
@@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Cache.Query
{
using System;
using System.Diagnostics.CodeAnalysis;
+ using System.Linq;
using Apache.Ignite.Core.Impl.Binary;
using Apache.Ignite.Core.Impl.Cache;
using Apache.Ignite.Core.Impl.Common;
@@ -108,6 +109,18 @@ namespace Apache.Ignite.Core.Cache.Query
/// </value>
public bool EnableDistributedJoins { get; set; }
+ /// <summary>
+ /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+ /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+ /// </summary>
+ public TimeSpan Timeout { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this query contains only replicated tables.
+ /// This is a hint for potentially more effective execution.
+ /// </summary>
+ public bool ReplicatedOnly { get; set; }
+
/** <inheritDoc /> */
internal override void Write(BinaryWriter writer, bool keepBinary)
{
@@ -126,6 +139,8 @@ namespace Apache.Ignite.Core.Cache.Query
WriteQueryArgs(writer, Arguments);
writer.WriteBoolean(EnableDistributedJoins);
+ writer.WriteInt((int) Timeout.TotalMilliseconds);
+ writer.WriteBoolean(ReplicatedOnly);
}
/** <inheritDoc /> */
@@ -133,5 +148,21 @@ namespace Apache.Ignite.Core.Cache.Query
{
get { return CacheOp.QrySql; }
}
+
+ /// <summary>
+ /// Returns a <see cref="string" /> that represents this instance.
+ /// </summary>
+ /// <returns>
+ /// A <see cref="string" /> that represents this instance.
+ /// </returns>
+ public override string ToString()
+ {
+ var args = string.Join(", ", Arguments.Select(x => x == null ? "null" : x.ToString()));
+
+ return string.Format("SqlQuery [Sql={0}, Arguments=[{1}], Local={2}, PageSize={3}, " +
+ "EnableDistributedJoins={4}, Timeout={5}, ReplicatedOnly={6}]", Sql, args, Local,
+ PageSize, EnableDistributedJoins, Timeout, ReplicatedOnly);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
index 749409c..95787eb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs
@@ -1086,6 +1086,9 @@ namespace Apache.Ignite.Core.Impl.Cache
writer.WriteBoolean(qry.EnableDistributedJoins);
writer.WriteBoolean(qry.EnforceJoinOrder);
+ writer.WriteInt((int) qry.Timeout.TotalMilliseconds);
+ writer.WriteBoolean(qry.ReplicatedOnly);
+ writer.WriteBoolean(qry.Colocated);
});
return new FieldsQueryCursor<T>(cursor, Marshaller, _flagKeepBinary, readerFunc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
index 4b536f4..2c609c6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/CacheExtensions.cs
@@ -20,6 +20,7 @@ namespace Apache.Ignite.Linq
using System.Linq;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
+ using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Linq.Impl;
/// <summary>
@@ -43,6 +44,8 @@ namespace Apache.Ignite.Linq
public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
this ICache<TKey, TValue> cache)
{
+ IgniteArgumentCheck.NotNull(cache, "cache");
+
return cache.AsCacheQueryable(false, null);
}
@@ -64,6 +67,8 @@ namespace Apache.Ignite.Linq
public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
this ICache<TKey, TValue> cache, bool local)
{
+ IgniteArgumentCheck.NotNull(cache, "cache");
+
return cache.AsCacheQueryable(local, null);
}
@@ -92,6 +97,8 @@ namespace Apache.Ignite.Linq
public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
this ICache<TKey, TValue> cache, bool local, string tableName)
{
+ IgniteArgumentCheck.NotNull(cache, "cache");
+
return cache.AsCacheQueryable(new QueryOptions {Local = local, TableName = tableName});
}
@@ -114,6 +121,9 @@ namespace Apache.Ignite.Linq
public static IQueryable<ICacheEntry<TKey, TValue>> AsCacheQueryable<TKey, TValue>(
this ICache<TKey, TValue> cache, QueryOptions queryOptions)
{
+ IgniteArgumentCheck.NotNull(cache, "cache");
+ IgniteArgumentCheck.NotNull(queryOptions, "queryOptions");
+
return new CacheQueryable<TKey, TValue>(cache, queryOptions);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
index 8dfddc7..27082bd 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheFieldsQueryExecutor.cs
@@ -38,41 +38,26 @@ namespace Apache.Ignite.Linq.Impl
{
/** */
private readonly ICacheInternal _cache;
+
+ /** */
+ private readonly QueryOptions _options;
/** */
private static readonly CopyOnWriteConcurrentDictionary<ConstructorInfo, object> CtorCache =
new CopyOnWriteConcurrentDictionary<ConstructorInfo, object>();
- /** */
- private readonly bool _local;
-
- /** */
- private readonly int _pageSize;
-
- /** */
- private readonly bool _enableDistributedJoins;
-
- /** */
- private readonly bool _enforceJoinOrder;
-
/// <summary>
/// Initializes a new instance of the <see cref="CacheFieldsQueryExecutor" /> class.
/// </summary>
/// <param name="cache">The executor function.</param>
- /// <param name="local">Local flag.</param>
- /// <param name="pageSize">Size of the page.</param>
- /// <param name="enableDistributedJoins">Distributed joins flag.</param>
- /// <param name="enforceJoinOrder">Enforce join order flag.</param>
- public CacheFieldsQueryExecutor(ICacheInternal cache, bool local, int pageSize, bool enableDistributedJoins,
- bool enforceJoinOrder)
+ /// <param name="options">Query options.</param>
+ public CacheFieldsQueryExecutor(ICacheInternal cache, QueryOptions options)
{
Debug.Assert(cache != null);
+ Debug.Assert(options != null);
_cache = cache;
- _local = local;
- _pageSize = pageSize;
- _enableDistributedJoins = enableDistributedJoins;
- _enforceJoinOrder = enforceJoinOrder;
+ _options = options;
}
/** <inheritdoc /> */
@@ -252,11 +237,16 @@ namespace Apache.Ignite.Linq.Impl
/// </summary>
internal SqlFieldsQuery GetFieldsQuery(string text, object[] args)
{
- return new SqlFieldsQuery(text, _local, args)
+ return new SqlFieldsQuery(text)
{
- EnableDistributedJoins = _enableDistributedJoins,
- PageSize = _pageSize,
- EnforceJoinOrder = _enforceJoinOrder
+ EnableDistributedJoins = _options.EnableDistributedJoins,
+ PageSize = _options.PageSize,
+ EnforceJoinOrder = _options.EnforceJoinOrder,
+ Timeout = _options.Timeout,
+ ReplicatedOnly = _options.ReplicatedOnly,
+ Colocated = _options.Colocated,
+ Local = _options.Local,
+ Arguments = args
};
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
index 7372776..e271363 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/Impl/CacheQueryable.cs
@@ -33,8 +33,7 @@ namespace Apache.Ignite.Linq.Impl
/// <param name="queryOptions">The query options.</param>
public CacheQueryable(ICache<TKey, TValue> cache, QueryOptions queryOptions)
: base(new CacheFieldsQueryProvider(CacheQueryParser.Instance,
- new CacheFieldsQueryExecutor((ICacheInternal) cache, queryOptions.Local, queryOptions.PageSize,
- queryOptions.EnableDistributedJoins, queryOptions.EnforceJoinOrder),
+ new CacheFieldsQueryExecutor((ICacheInternal) cache, queryOptions),
cache.Ignite, cache.GetConfiguration(), queryOptions.TableName, typeof(TValue)))
{
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca94cf3d/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
index c70152e..17b3705 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Linq/QueryOptions.cs
@@ -17,6 +17,7 @@
namespace Apache.Ignite.Linq
{
+ using System;
using System.ComponentModel;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Cache.Query;
@@ -87,5 +88,27 @@ namespace Apache.Ignite.Linq
/// <c>true</c> if join order should be enforced; otherwise, <c>false</c>.
/// </value>
public bool EnforceJoinOrder { get; set; }
+
+ /// <summary>
+ /// Gets or sets the query timeout. Query will be automatically cancelled if the execution timeout is exceeded.
+ /// Default is <see cref="TimeSpan.Zero"/>, which means no timeout.
+ /// </summary>
+ public TimeSpan Timeout { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this query contains only replicated tables.
+ /// This is a hint for potentially more effective execution.
+ /// </summary>
+ public bool ReplicatedOnly { get; set; }
+
+ /// <summary>
+ /// Gets or sets a value indicating whether this query operates on colocated data.
+ /// <para />
+ /// Whenever Ignite executes a distributed query, it sends sub-queries to individual cluster members.
+ /// If you know in advance that the elements of your query selection are colocated together on the same
+ /// node and you group by colocated key (primary or affinity key), then Ignite can make significant
+ /// performance and network optimizations by grouping data on remote nodes.
+ /// </summary>
+ public bool Colocated { get; set; }
}
}