You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2017/04/17 07:04:07 UTC
[2/2] ignite git commit: ignite-4955 - Correctly execute SQL queries
started on replicated cache. - Fixes #1806.
ignite-4955 - Correctly execute SQL queries started on replicated cache. - Fixes #1806.
Signed-off-by: Sergi Vladykin <se...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ded599ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ded599ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ded599ae
Branch: refs/heads/master
Commit: ded599aed95501d7553dcf326590d9440a5e9ef7
Parents: 86c4058
Author: Sergi Vladykin <se...@gmail.com>
Authored: Mon Apr 17 10:03:20 2017 +0300
Committer: Sergi Vladykin <se...@gmail.com>
Committed: Mon Apr 17 10:03:20 2017 +0300
----------------------------------------------------------------------
.../ignite/cache/query/SqlFieldsQuery.java | 25 +++
.../org/apache/ignite/cache/query/SqlQuery.java | 31 +++-
.../processors/cache/IgniteCacheProxy.java | 4 +-
.../cache/query/GridCacheSqlQuery.java | 114 ++++----------
.../cache/query/GridCacheTwoStepQuery.java | 26 +++-
.../processors/query/h2/IgniteH2Indexing.java | 34 ++--
.../query/h2/sql/GridSqlQuerySplitter.java | 85 ++++------
.../query/h2/twostep/GridMapQueryExecutor.java | 68 +++++---
.../h2/twostep/GridReduceQueryExecutor.java | 154 ++++++++++---------
.../h2/twostep/msg/GridH2QueryRequest.java | 109 ++++++++++---
.../IgniteCacheAbstractFieldsQuerySelfTest.java | 51 +++---
...teCacheJoinPartitionedAndReplicatedTest.java | 10 ++
...iteCacheReplicatedFieldsQueryROSelfTest.java | 27 ++++
.../query/IgniteSqlSplitterSelfTest.java | 125 +++++++++++++--
.../IgniteCacheQuerySelfTestSuite.java | 4 +-
15 files changed, 563 insertions(+), 304 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 1f10ca8..8c3a4fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -67,6 +67,9 @@ public class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean distributedJoins;
+ /** */
+ private boolean replicatedOnly;
+
/**
* Constructs SQL fields query.
*
@@ -236,6 +239,28 @@ public class SqlFieldsQuery extends Query<List<?>> {
return (SqlFieldsQuery)super.setLocal(loc);
}
+ /**
+ * Specify if the query contains only replicated tables.
+ * This is a hint for potentially more effective execution.
+ *
+ * @param replicatedOnly The query contains only replicated tables.
+ * @return {@code this} For chaining.
+ */
+ public SqlFieldsQuery setReplicatedOnly(boolean replicatedOnly) {
+ this.replicatedOnly = replicatedOnly;
+
+ return this;
+ }
+
+ /**
+ * Check is the query contains only replicated tables.
+ *
+ * @return {@code true} If the query contains only replicated tables.
+ */
+ public boolean isReplicatedOnly() {
+ return replicatedOnly;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlFieldsQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index d77e5ce..944c70e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -53,6 +53,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/** */
private boolean distributedJoins;
+ /** */
+ private boolean replicatedOnly;
+
/**
* Constructs query for the given type name and SQL query.
*
@@ -197,7 +200,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
* @param type Type.
* @return {@code this} For chaining.
*/
- public SqlQuery setType(Class<?> type) {
+ public SqlQuery<K, V> setType(Class<?> type) {
return setType(QueryUtils.typeName(type));
}
@@ -210,7 +213,7 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
* @param distributedJoins Distributed joins enabled.
* @return {@code this} For chaining.
*/
- public SqlQuery setDistributedJoins(boolean distributedJoins) {
+ public SqlQuery<K, V> setDistributedJoins(boolean distributedJoins) {
this.distributedJoins = distributedJoins;
return this;
@@ -219,12 +222,34 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/**
* Check if distributed joins are enabled for this query.
*
- * @return {@code true} If distributed joind enabled.
+ * @return {@code true} If distributed joins enabled.
*/
public boolean isDistributedJoins() {
return distributedJoins;
}
+ /**
+ * Specify if the query contains only replicated tables.
+ * This is a hint for potentially more effective execution.
+ *
+ * @param replicatedOnly The query contains only replicated tables.
+ * @return {@code this} For chaining.
+ */
+ public SqlQuery<K, V> setReplicatedOnly(boolean replicatedOnly) {
+ this.replicatedOnly = replicatedOnly;
+
+ return this;
+ }
+
+ /**
+ * Check is the query contains only replicated tables.
+ *
+ * @return {@code true} If the query contains only replicated tables.
+ */
+ public boolean isReplicatedOnly() {
+ return replicatedOnly;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 14edcac..98f2f93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -780,7 +780,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
final SqlQuery p = (SqlQuery)qry;
- if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+ if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocal(ctx, p,
opCtxCall != null && opCtxCall.isKeepBinary());
@@ -794,7 +794,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
SqlFieldsQuery p = (SqlFieldsQuery)qry;
- if (isReplicatedDataNode() || ctx.isLocal() || qry.isLocal())
+ if ((p.isReplicatedOnly() && isReplicatedDataNode()) || ctx.isLocal() || qry.isLocal())
return (QueryCursor<R>)ctx.kernalContext().query().queryLocalFields(ctx, p);
return (QueryCursor<R>)ctx.kernalContext().query().queryTwoStep(ctx, p);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index ea07fb7..780e462 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -21,17 +21,11 @@ import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -39,7 +33,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Query.
*/
-public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
+public class GridCacheSqlQuery implements Message {
/** */
private static final long serialVersionUID = 0L;
@@ -51,26 +45,12 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
private String qry;
/** */
- @GridToStringInclude(sensitive = true)
- @GridDirectTransient
- private Object[] params;
-
- /** */
- private byte[] paramsBytes;
-
- /** */
@GridToStringInclude
- @GridDirectTransient
private int[] paramIdxs;
/** */
@GridToStringInclude
@GridDirectTransient
- private int paramsSize;
-
- /** */
- @GridToStringInclude
- @GridDirectTransient
private LinkedHashMap<String, ?> cols;
/** Field kept for backward compatibility. */
@@ -140,13 +120,6 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
}
/**
- * @return Parameters.
- */
- public Object[] parameters() {
- return params;
- }
-
- /**
* @return Parameter indexes.
*/
public int[] parameterIndexes() {
@@ -154,57 +127,16 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
}
/**
- * @param params Parameters.
* @param paramIdxs Parameter indexes.
- * @return {@code this} For chaining.
+ * @return {@code this}.
*/
- public GridCacheSqlQuery parameters(Object[] params, int[] paramIdxs) {
- this.params = F.isEmpty(params) ? EMPTY_PARAMS : params;
-
- paramsSize = this.params.length;
-
+ public GridCacheSqlQuery parameterIndexes(int[] paramIdxs) {
this.paramIdxs = paramIdxs;
return this;
}
/** {@inheritDoc} */
- @Override public void marshall(Marshaller m) {
- if (paramsBytes != null)
- return;
-
- assert params != null;
-
- try {
- paramsBytes = U.marshal(m, params);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
- if (params != null)
- return;
-
- assert paramsBytes != null;
-
- try {
- final ClassLoader ldr = U.resolveClassLoader(ctx.config());
-
- if (m instanceof BinaryMarshaller)
- // To avoid deserializing of enum types.
- params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
- else
- params = U.unmarshal(m, paramsBytes, ldr);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
@@ -239,7 +171,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 2:
- if (!writer.writeByteArray("paramsBytes", paramsBytes))
+ if (!writer.writeIntArray("paramIdxs", paramIdxs))
return false;
writer.incrementState();
@@ -280,7 +212,7 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 2:
- paramsBytes = reader.readByteArray("paramsBytes");
+ paramIdxs = reader.readIntArray("paramIdxs");
if (!reader.isLastRead())
return false;
@@ -311,28 +243,17 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
}
/**
- * @param args Arguments.
* @return Copy.
*/
- public GridCacheSqlQuery copy(Object[] args) {
+ public GridCacheSqlQuery copy() {
GridCacheSqlQuery cp = new GridCacheSqlQuery();
cp.qry = qry;
cp.cols = cols;
cp.paramIdxs = paramIdxs;
- cp.paramsSize = paramsSize;
cp.sort = sort;
cp.partitioned = partitioned;
- if (F.isEmpty(args))
- cp.params = EMPTY_PARAMS;
- else {
- cp.params = new Object[paramsSize];
-
- for (int paramIdx : paramIdxs)
- cp.params[paramIdx] = args[paramIdx];
- }
-
return cp;
}
@@ -380,4 +301,27 @@ public class GridCacheSqlQuery implements Message, GridCacheQueryMarshallable {
return this;
}
+
+ /**
+ * @param allParams All parameters.
+ * @return Parameters only for this query.
+ */
+ public Object[] parameters(Object[] allParams) {
+ if (F.isEmpty(paramIdxs))
+ return EMPTY_PARAMS;
+
+ assert !F.isEmpty(allParams);
+
+ int maxIdx = paramIdxs[paramIdxs.length - 1];
+
+ Object[] res = new Object[maxIdx + 1];
+
+ for (int i = 0; i < paramIdxs.length; i++) {
+ int idx = paramIdxs[i];
+
+ res[idx] = allParams[idx];
+ }
+
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index c127eeb..0e31dc0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -95,7 +95,7 @@ public class GridCacheTwoStepQuery {
/**
* Check if distributed joins are enabled for this query.
*
- * @return {@code true} If distributed joind enabled.
+ * @return {@code true} If distributed joins enabled.
*/
public boolean distributedJoins() {
return distributedJoins;
@@ -146,12 +146,23 @@ public class GridCacheTwoStepQuery {
/**
* @param qry SQL Query.
- * @return {@code this}.
*/
- public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) {
+ public void addMapQuery(GridCacheSqlQuery qry) {
mapQrys.add(qry);
+ }
+
+ /**
+ * @return {@code true} If all the map queries contain only replicated tables.
+ */
+ public boolean isReplicatedOnly() {
+ assert !mapQrys.isEmpty();
+
+ for (int i = 0; i < mapQrys.size(); i++) {
+ if (mapQrys.get(i).isPartitioned())
+ return false;
+ }
- return this;
+ return true;
}
/**
@@ -246,10 +257,9 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param args New arguments to copy with.
* @return Copy.
*/
- public GridCacheTwoStepQuery copy(Object[] args) {
+ public GridCacheTwoStepQuery copy() {
assert !explain;
GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(originalSql, schemas, tbls);
@@ -257,13 +267,13 @@ public class GridCacheTwoStepQuery {
cp.caches = caches;
cp.extraCaches = extraCaches;
cp.spaces = spaces;
- cp.rdc = rdc.copy(args);
+ cp.rdc = rdc.copy();
cp.skipMergeTbl = skipMergeTbl;
cp.pageSize = pageSize;
cp.distributedJoins = distributedJoins;
for (int i = 0; i < mapQrys.size(); i++)
- cp.mapQrys.add(mapQrys.get(i).copy(args));
+ cp.mapQrys.add(mapQrys.get(i).copy());
return cp;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5f2d8c0..531b760 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -78,7 +78,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -91,8 +90,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
@@ -101,8 +98,8 @@ import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
import org.apache.ignite.internal.processors.query.GridQueryIndexing;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.database.H2PkHashIndex;
import org.apache.ignite.internal.processors.query.h2.database.H2RowFactory;
import org.apache.ignite.internal.processors.query.h2.database.H2TreeIndex;
@@ -110,6 +107,7 @@ import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasInnerI
import org.apache.ignite.internal.processors.query.h2.database.io.H2ExtrasLeafIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2InnerIO;
import org.apache.ignite.internal.processors.query.h2.database.io.H2LeafIO;
+import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2DefaultTableEngine;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOffheap;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
@@ -1299,13 +1297,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param enforceJoinOrder Enforce join order of tables.
* @return Iterable result.
*/
- private Iterable<List<?>> runQueryTwoStep(final GridCacheContext<?,?> cctx, final GridCacheTwoStepQuery qry,
- final boolean keepCacheObj, final boolean enforceJoinOrder,
+ private Iterable<List<?>> runQueryTwoStep(
+ final GridCacheContext<?,?> cctx,
+ final GridCacheTwoStepQuery qry,
+ final boolean keepCacheObj,
+ final boolean enforceJoinOrder,
final int timeoutMillis,
- final GridQueryCancel cancel) {
+ final GridQueryCancel cancel,
+ final Object[] params
+ ) {
return new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel);
+ return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params);
}
};
}
@@ -1403,7 +1406,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
if (cachedQry != null) {
- twoStepQry = cachedQry.twoStepQry.copy(qry.getArgs());
+ twoStepQry = cachedQry.twoStepQry.copy();
meta = cachedQry.meta;
}
else {
@@ -1539,12 +1542,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cancel = new GridQueryCancel();
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel), cancel);
+ runQueryTwoStep(cctx, twoStepQry, cctx.keepBinary(), enforceJoinOrder, qry.getTimeout(), cancel, qry.getArgs()),
+ cancel);
cursor.fieldsMeta(meta);
if (cachedQry == null && !twoStepQry.explain()) {
- cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy(null));
+ cachedQry = new TwoStepCachedQuery(meta, twoStepQry.copy());
twoStepCache.putIfAbsent(cachedQryKey, cachedQry);
}
@@ -1556,7 +1560,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
*/
private void checkCacheIndexSegmentation(List<Integer> caches) {
if (caches.isEmpty())
- return; //Nnothing to check
+ return; // Nothing to check
GridCacheSharedContext sharedContext = ctx.cache().context();
@@ -1567,12 +1571,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
assert cctx != null;
- if(!cctx.isPartitioned())
+ if (!cctx.isPartitioned())
continue;
- if(expectedParallelism == 0)
+ if (expectedParallelism == 0)
expectedParallelism = cctx.config().getQueryParallelism();
- else if (expectedParallelism != 0 && cctx.config().getQueryParallelism() != expectedParallelism)
+ else if (cctx.config().getQueryParallelism() != expectedParallelism)
throw new IllegalStateException("Using indexes with different parallelism levels in same query is forbidden.");
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index aec0b36..b3d54e1 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -43,7 +44,6 @@ import org.h2.command.Prepared;
import org.h2.command.dml.Query;
import org.h2.command.dml.SelectUnion;
import org.h2.jdbc.JdbcPreparedStatement;
-import org.h2.util.IntArray;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
@@ -67,7 +67,6 @@ import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.W
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect.childIndexForColumn;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.LEFT_CHILD;
import static org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion.RIGHT_CHILD;
-import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.toArray;
/**
* Splits a single SQL query into two step map-reduce query.
@@ -211,7 +210,7 @@ public class GridSqlQuerySplitter {
boolean allCollocated = true;
for (GridCacheSqlQuery mapSqlQry : splitter.mapSqlQrys) {
- Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(),
+ Prepared prepared = optimize(h2, conn, mapSqlQry.query(), mapSqlQry.parameters(params),
true, enforceJoinOrder);
allCollocated &= isCollocated((Query)prepared);
@@ -1344,11 +1343,18 @@ public class GridSqlQuerySplitter {
* @param params All parameters.
*/
private static void setupParameters(GridCacheSqlQuery sqlQry, GridSqlQuery qryAst, Object[] params) {
- IntArray paramIdxs = new IntArray(params.length);
+ TreeSet<Integer> paramIdxs = new TreeSet<>();
- params = findParams(qryAst, params, new ArrayList<>(params.length), paramIdxs).toArray();
+ findParamsQuery(qryAst, params, paramIdxs);
- sqlQry.parameters(params, toArray(paramIdxs));
+ int[] paramIdxsArr = new int[paramIdxs.size()];
+
+ int i = 0;
+
+ for (Integer paramIdx : paramIdxs)
+ paramIdxsArr[i++] = paramIdx;
+
+ sqlQry.parameterIndexes(paramIdxsArr);
}
/**
@@ -1451,9 +1457,8 @@ public class GridSqlQuerySplitter {
/**
* @param prnt Table parent element.
* @param childIdx Child index for the table or alias containing the table.
- * @return Generated alias.
*/
- private GridSqlAlias generateUniqueAlias(GridSqlAst prnt, int childIdx) {
+ private void generateUniqueAlias(GridSqlAst prnt, int childIdx) {
GridSqlAst child = prnt.child(childIdx);
GridSqlAst tbl = GridSqlAlias.unwrap(child);
@@ -1468,8 +1473,6 @@ public class GridSqlQuerySplitter {
// Replace the child in the parent.
prnt.child(childIdx, uniqueAliasAst);
-
- return uniqueAliasAst;
}
/**
@@ -1586,64 +1589,54 @@ public class GridSqlQuerySplitter {
/**
* @param qry Select.
* @param params Parameters.
- * @param target Extracted parameters.
* @param paramIdxs Parameter indexes.
- * @return Extracted parameters list.
*/
- private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target,
- IntArray paramIdxs) {
+ private static void findParamsQuery(GridSqlQuery qry, Object[] params, TreeSet<Integer> paramIdxs) {
if (qry instanceof GridSqlSelect)
- return findParams((GridSqlSelect)qry, params, target, paramIdxs);
-
- GridSqlUnion union = (GridSqlUnion)qry;
-
- findParams(union.left(), params, target, paramIdxs);
- findParams(union.right(), params, target, paramIdxs);
+ findParamsSelect((GridSqlSelect)qry, params, paramIdxs);
+ else {
+ GridSqlUnion union = (GridSqlUnion)qry;
- findParams(qry.limit(), params, target, paramIdxs);
- findParams(qry.offset(), params, target, paramIdxs);
+ findParamsQuery(union.left(), params, paramIdxs);
+ findParamsQuery(union.right(), params, paramIdxs);
- return target;
+ findParams(qry.limit(), params, paramIdxs);
+ findParams(qry.offset(), params, paramIdxs);
+ }
}
/**
* @param select Select.
* @param params Parameters.
- * @param target Extracted parameters.
* @param paramIdxs Parameter indexes.
- * @return Extracted parameters list.
*/
- private static List<Object> findParams(
+ private static void findParamsSelect(
GridSqlSelect select,
Object[] params,
- ArrayList<Object> target,
- IntArray paramIdxs
+ TreeSet<Integer> paramIdxs
) {
if (params.length == 0)
- return target;
+ return;
for (GridSqlAst el : select.columns(false))
- findParams(el, params, target, paramIdxs);
+ findParams(el, params, paramIdxs);
- findParams(select.from(), params, target, paramIdxs);
- findParams(select.where(), params, target, paramIdxs);
+ findParams(select.from(), params, paramIdxs);
+ findParams(select.where(), params, paramIdxs);
// Don't search in GROUP BY and HAVING since they expected to be in select list.
- findParams(select.limit(), params, target, paramIdxs);
- findParams(select.offset(), params, target, paramIdxs);
-
- return target;
+ findParams(select.limit(), params, paramIdxs);
+ findParams(select.offset(), params, paramIdxs);
}
/**
* @param el Element.
* @param params Parameters.
- * @param target Extracted parameters.
* @param paramIdxs Parameter indexes.
*/
- private static void findParams(@Nullable GridSqlAst el, Object[] params, ArrayList<Object> target,
- IntArray paramIdxs) {
+ private static void findParams(@Nullable GridSqlAst el, Object[] params,
+ TreeSet<Integer> paramIdxs) {
if (el == null)
return;
@@ -1652,27 +1645,17 @@ public class GridSqlQuerySplitter {
// Here we will set them to NULL.
final int idx = ((GridSqlParameter)el).index();
- while (target.size() < idx)
- target.add(null);
-
if (params.length <= idx)
throw new IgniteException("Invalid number of query parameters. " +
"Cannot find " + idx + " parameter.");
- Object param = params[idx];
-
- if (idx == target.size())
- target.add(param);
- else
- target.set(idx, param);
-
paramIdxs.add(idx);
}
else if (el instanceof GridSqlSubquery)
- findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs);
+ findParamsQuery(((GridSqlSubquery)el).subquery(), params, paramIdxs);
else {
for (int i = 0; i < el.size(); i++)
- findParams((GridSqlAst)el.child(i), params, target, paramIdxs);
+ findParams(el.child(i), params, paramIdxs);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 7cd9f17..e4347b5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsReservation;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
@@ -401,6 +402,26 @@ public class GridMapQueryExecutor {
}
/**
+ * @param caches Cache IDs.
+ * @return The first found partitioned cache.
+ */
+ private GridCacheContext<?,?> findFirstPartitioned(List<Integer> caches) {
+ GridCacheSharedContext<?,?> sctx = ctx.cache().context();
+
+ for (int i = 0; i < caches.size(); i++) {
+ GridCacheContext<?,?> mainCctx = sctx.cacheContext(caches.get(i));
+
+ if (mainCctx == null)
+ throw new CacheException("Failed to find cache.");
+
+ if (!mainCctx.isLocal() && !mainCctx.isReplicated())
+ return mainCctx;
+ }
+
+ throw new IllegalStateException("Failed to find a partitioned cache.");
+ }
+
+ /**
* @param node Node.
* @param req Query request.
*/
@@ -408,12 +429,7 @@ public class GridMapQueryExecutor {
final Map<UUID,int[]> partsMap = req.partitions();
final int[] parts = partsMap == null ? null : partsMap.get(ctx.localNodeId());
- assert req.caches() != null && !req.caches().isEmpty();
-
- GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext( req.caches().get(0));
-
- if (mainCctx == null)
- throw new CacheException("Failed to find cache.");
+ assert !F.isEmpty(req.caches());
final DistributedJoinMode joinMode = distributedJoinMode(
req.isFlagSet(GridH2QueryRequest.FLAG_IS_LOCAL),
@@ -421,8 +437,12 @@ public class GridMapQueryExecutor {
final boolean enforceJoinOrder = req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER);
final boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
+ final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+
+ int segments = explain || replicated ? 1 :
+ findFirstPartitioned(req.caches()).config().getQueryParallelism();
- int segments = explain ? 1 : mainCctx.config().getQueryParallelism();
+ final Object[] params = req.parameters();
for (int i = 1; i < segments; i++) {
final int segment = i;
@@ -442,7 +462,9 @@ public class GridMapQueryExecutor {
req.pageSize(),
joinMode,
enforceJoinOrder,
- req.timeout());
+ replicated,
+ req.timeout(),
+ params);
return null;
}
@@ -462,7 +484,9 @@ public class GridMapQueryExecutor {
req.pageSize(),
joinMode,
enforceJoinOrder,
- req.timeout());
+ replicated,
+ req.timeout(),
+ params);
}
/**
@@ -491,7 +515,9 @@ public class GridMapQueryExecutor {
int pageSize,
DistributedJoinMode distributedJoinMode,
boolean enforceJoinOrder,
- int timeout
+ boolean replicated,
+ int timeout,
+ Object[] params
) {
// Prepare to run queries.
GridCacheContext<?, ?> mainCctx = ctx.cache().context().cacheContext(cacheIds.get(0));
@@ -525,7 +551,7 @@ public class GridMapQueryExecutor {
node.id(),
reqId,
segmentId,
- mainCctx.isReplicated() ? REPLICATED : MAP)
+ replicated ? REPLICATED : MAP)
.filter(h2.backupFilter(topVer, parts))
.partitionsMap(partsMap)
.distributedJoinMode(distributedJoinMode)
@@ -579,7 +605,7 @@ public class GridMapQueryExecutor {
if (qry.node() == null ||
(segmentId == 0 && qry.node().equals(ctx.localNodeId()))) {
rs = h2.executeSqlQueryWithTimer(mainCctx.name(), conn, qry.query(),
- F.asList(qry.parameters()), true,
+ F.asList(qry.parameters(params)), true,
timeout,
qr.cancels[qryIdx]);
@@ -594,7 +620,7 @@ public class GridMapQueryExecutor {
qry.query(),
null,
null,
- qry.parameters(),
+ params,
node.id(),
null));
}
@@ -602,7 +628,7 @@ public class GridMapQueryExecutor {
assert rs instanceof JdbcResultSet : rs.getClass();
}
- qr.addResult(qryIdx, qry, node.id(), rs);
+ qr.addResult(qryIdx, qry, node.id(), rs, params);
if (qr.canceled) {
qr.result(qryIdx).close();
@@ -965,8 +991,8 @@ public class GridMapQueryExecutor {
* @param qrySrcNodeId Query source node.
* @param rs Result set.
*/
- void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs) {
- if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q)))
+ void addResult(int qry, GridCacheSqlQuery q, UUID qrySrcNodeId, ResultSet rs, Object[] params) {
+ if (!results.compareAndSet(qry, null, new QueryResult(rs, cctx, qrySrcNodeId, q, params)))
throw new IllegalStateException();
}
@@ -1046,15 +1072,21 @@ public class GridMapQueryExecutor {
/** */
private volatile boolean closed;
+ /** */
+ private final Object[] params;
+
/**
* @param rs Result set.
* @param cctx Cache context.
* @param qrySrcNodeId Query source node.
* @param qry Query.
+ * @param params Query params.
*/
- private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry) {
+ private QueryResult(ResultSet rs, GridCacheContext<?, ?> cctx, UUID qrySrcNodeId, GridCacheSqlQuery qry,
+ Object[] params) {
this.cctx = cctx;
this.qry = qry;
+ this.params = params;
this.qrySrcNodeId = qrySrcNodeId;
this.cpNeeded = cctx.isLocalNode(qrySrcNodeId);
@@ -1139,7 +1171,7 @@ public class GridMapQueryExecutor {
qry.query(),
null,
null,
- qry.parameters(),
+ params,
qrySrcNodeId,
null,
null,
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 7d255b1..0421ca0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -62,10 +62,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
-import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
-import org.apache.ignite.internal.processors.query.h2.GridH2ResultSetIterator;
import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
@@ -102,6 +100,7 @@ import org.jsr166.ConcurrentHashMap8;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.setupConnection;
import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -406,12 +405,14 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param isReplicatedOnly If we must only have replicated caches.
* @param topVer Topology version.
* @param cctx Cache context for main space.
* @param extraSpaces Extra spaces.
* @return Data nodes or {@code null} if repartitioning started and we need to retry.
*/
private Collection<ClusterNode> stableDataNodes(
+ boolean isReplicatedOnly,
AffinityTopologyVersion topVer,
final GridCacheContext<?, ?> cctx,
List<Integer> extraSpaces
@@ -430,7 +431,7 @@ public class GridReduceQueryExecutor {
if (extraCctx.isLocal())
continue; // No consistency guaranties for local caches.
- if (cctx.isReplicated() && !extraCctx.isReplicated())
+ if (isReplicatedOnly && !extraCctx.isReplicated())
throw new CacheException("Queries running on replicated cache should not contain JOINs " +
"with partitioned tables [rCache=" + cctx.name() + ", pCache=" + extraSpace + "]");
@@ -439,7 +440,7 @@ public class GridReduceQueryExecutor {
if (F.isEmpty(extraNodes))
throw new CacheException("Failed to find data nodes for cache: " + extraSpace);
- if (cctx.isReplicated() && extraCctx.isReplicated()) {
+ if (isReplicatedOnly && extraCctx.isReplicated()) {
nodes.retainAll(extraNodes);
if (nodes.isEmpty()) {
@@ -450,7 +451,7 @@ public class GridReduceQueryExecutor {
", cache2=" + extraSpace + "]");
}
}
- else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
+ else if (!isReplicatedOnly && extraCctx.isReplicated()) {
if (!extraNodes.containsAll(nodes))
if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
@@ -458,7 +459,7 @@ public class GridReduceQueryExecutor {
throw new CacheException("Caches have distinct sets of data nodes [cache1=" + cctx.name() +
", cache2=" + extraSpace + "]");
}
- else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
+ else if (!isReplicatedOnly && !extraCctx.isReplicated()) {
if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
if (isPreloadingActive(cctx, extraSpaces))
return null; // Retry.
@@ -481,6 +482,7 @@ public class GridReduceQueryExecutor {
* @param enforceJoinOrder Enforce join order of tables.
* @param timeoutMillis Timeout in milliseconds.
* @param cancel Query cancel.
+ * @param params Query parameters.
* @return Rows iterator.
*/
public Iterator<List<?>> query(
@@ -489,8 +491,14 @@ public class GridReduceQueryExecutor {
boolean keepPortable,
boolean enforceJoinOrder,
int timeoutMillis,
- GridQueryCancel cancel
+ GridQueryCancel cancel,
+ Object[] params
) {
+ if (F.isEmpty(params))
+ params = EMPTY_PARAMS;
+
+ final boolean isReplicatedOnly = qry.isReplicatedOnly();
+
for (int attempt = 0;; attempt++) {
if (attempt != 0) {
try {
@@ -524,7 +532,7 @@ public class GridReduceQueryExecutor {
nodes = singletonList(ctx.discovery().localNode());
else {
if (isPreloadingActive(cctx, extraSpaces)) {
- if (cctx.isReplicated())
+ if (isReplicatedOnly)
nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
else {
partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
@@ -533,19 +541,24 @@ public class GridReduceQueryExecutor {
}
}
else
- nodes = stableDataNodes(topVer, cctx, extraSpaces);
+ nodes = stableDataNodes(isReplicatedOnly, topVer, cctx, extraSpaces);
if (nodes == null)
continue; // Retry.
assert !nodes.isEmpty();
- if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || !nodes.contains(ctx.discovery().localNode()) :
- "We must be on a client node.";
+ if (isReplicatedOnly || qry.explain()) {
+ ClusterNode locNode = ctx.discovery().localNode();
- // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- nodes = singletonList(F.rand(nodes));
+ // Always prefer local node if possible.
+ if (nodes.contains(locNode))
+ nodes = singletonList(locNode);
+ else {
+ // Select random data node to run query on a replicated data or
+ // get EXPLAIN PLAN from a single node.
+ nodes = singletonList(F.rand(nodes));
+ }
}
}
@@ -553,7 +566,8 @@ public class GridReduceQueryExecutor {
final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
- final int segmentsPerIndex = qry.explain() ? 1 : cctx.config().getQueryParallelism();
+ final int segmentsPerIndex = qry.explain() || isReplicatedOnly ? 1 :
+ findFirstPartitioned(cctx, extraSpaces).config().getQueryParallelism();
int replicatedQrysCnt = 0;
@@ -595,7 +609,7 @@ public class GridReduceQueryExecutor {
r.idxs.add(idx);
}
- r.latch = new CountDownLatch(
+ r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
(r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
runs.put(qryReqId, r);
@@ -616,7 +630,7 @@ public class GridReduceQueryExecutor {
for (GridCacheSqlQuery mapQry : qry.mapQueries())
mapQrys.add(new GridCacheSqlQuery("EXPLAIN " + mapQry.query())
- .parameters(mapQry.parameters(), mapQry.parameterIndexes()));
+ .parameterIndexes(mapQry.parameterIndexes()));
}
final boolean distributedJoins = qry.distributedJoins();
@@ -643,6 +657,9 @@ public class GridReduceQueryExecutor {
if (qry.explain())
flags |= GridH2QueryRequest.FLAG_EXPLAIN;
+ if (isReplicatedOnly)
+ flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
if (send(nodes,
new GridH2QueryRequest()
.requestId(qryReqId)
@@ -652,6 +669,7 @@ public class GridReduceQueryExecutor {
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
.queries(mapQrys)
+ .parameters(params)
.flags(flags)
.timeout(timeoutMillis),
null,
@@ -723,14 +741,14 @@ public class GridReduceQueryExecutor {
try {
if (qry.explain())
- return explainPlan(r.conn, space, qry);
+ return explainPlan(r.conn, space, qry, params);
GridCacheSqlQuery rdc = qry.reduceQuery();
ResultSet res = h2.executeSqlQueryWithTimer(space,
r.conn,
rdc.query(),
- F.asList(rdc.parameters()),
+ F.asList(rdc.parameters(params)),
false, // The statement will cache some extra thread local objects.
timeoutMillis,
cancel);
@@ -790,6 +808,28 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return The first partitioned cache context.
+ */
+ private GridCacheContext<?,?> findFirstPartitioned(GridCacheContext<?,?> cctx, List<Integer> extraSpaces) {
+ if (cctx.isLocal())
+ throw new CacheException("Cache is LOCAL: " + cctx.name());
+
+ if (!cctx.isReplicated())
+ return cctx;
+
+ for (int i = 0 ; i < extraSpaces.size(); i++) {
+ GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+
+ if (!extraCctx.isReplicated() && !extraCctx.isLocal())
+ return extraCctx;
+ }
+
+ throw new IllegalStateException("Failed to find partitioned cache.");
+ }
+
+ /**
* Returns true if the exception is triggered by query cancel.
*
* @param e Exception.
@@ -896,9 +936,19 @@ public class GridReduceQueryExecutor {
* @param extraSpaces Extra spaces.
* @return Collection of all data nodes owning all the caches or {@code null} for retry.
*/
- private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?, ?> cctx,
+ private Collection<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?, ?> cctx,
List<Integer> extraSpaces) {
- assert cctx.isReplicated() : cctx.name() + " must be replicated";
+ int i = 0;
+
+ // The main cache is allowed to be partitioned.
+ if (!cctx.isReplicated()) {
+ assert !F.isEmpty(extraSpaces): "no extra replicated caches with partitioned main cache";
+
+ // Just replace the main cache with the first one extra.
+ cctx = cacheContext(extraSpaces.get(i++));
+
+ assert cctx.isReplicated(): "all the extra caches must be replicated here";
+ }
Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
@@ -906,7 +956,7 @@ public class GridReduceQueryExecutor {
return null; // Retry.
if (!F.isEmpty(extraSpaces)) {
- for (int i = 0; i < extraSpaces.size(); i++) {
+ for (;i < extraSpaces.size(); i++) {
GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
if (extraCctx.isLocal())
@@ -982,9 +1032,12 @@ public class GridReduceQueryExecutor {
* @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
*/
@SuppressWarnings("unchecked")
- private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
+ private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(GridCacheContext<?,?> cctx,
List<Integer> extraSpaces) {
- assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
+ assert !cctx.isLocal() : cctx.name() + " must not be LOCAL";
+
+ // If the main cache is replicated, just replace it with the first partitioned.
+ cctx = findFirstPartitioned(cctx, extraSpaces);
final int partsCnt = cctx.affinity().partitions();
@@ -1025,6 +1078,10 @@ public class GridReduceQueryExecutor {
for (int i = 0; i < extraSpaces.size(); i++) {
GridCacheContext<?, ?> extraCctx = cacheContext(extraSpaces.get(i));
+ // This is possible if we have replaced a replicated cache with a partitioned one earlier.
+ if (cctx == extraCctx)
+ continue;
+
if (extraCctx.isReplicated() || extraCctx.isLocal())
continue;
@@ -1093,32 +1150,14 @@ public class GridReduceQueryExecutor {
}
/**
- * @param mainSpace Main space.
- * @param allSpaces All spaces.
- * @return List of all extra spaces or {@code null} if none.
- */
- private List<String> extraSpaces(String mainSpace, Collection<String> allSpaces) {
- if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
- return null;
-
- ArrayList<String> res = new ArrayList<>(allSpaces.size());
-
- for (String space : allSpaces) {
- if (!F.eq(space, mainSpace))
- res.add(space);
- }
-
- return res;
- }
-
- /**
* @param c Connection.
* @param space Space.
* @param qry Query.
+ * @param params Query parameters.
* @return Cursor for plans.
* @throws IgniteCheckedException if failed.
*/
- private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry)
+ private Iterator<List<?>> explainPlan(JdbcConnection c, String space, GridCacheTwoStepQuery qry, Object[] params)
throws IgniteCheckedException {
List<List<?>> lists = new ArrayList<>();
@@ -1142,7 +1181,7 @@ public class GridReduceQueryExecutor {
ResultSet rs = h2.executeSqlQueryWithTimer(space,
c,
"EXPLAIN " + rdc.query(),
- F.asList(rdc.parameters()),
+ F.asList(rdc.parameters(params)),
false,
0,
null);
@@ -1419,29 +1458,4 @@ public class GridReduceQueryExecutor {
state(e, null);
}
}
-
- /**
- *
- */
- private static class Iter extends GridH2ResultSetIterator<List<?>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param data Data array.
- * @throws IgniteCheckedException If failed.
- */
- protected Iter(ResultSet data) throws IgniteCheckedException {
- super(data, true, false);
- }
-
- /** {@inheritDoc} */
- @Override protected List<?> createRow() {
- ArrayList<Object> res = new ArrayList<>(row.length);
-
- Collections.addAll(res, row);
-
- return res;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index f2f9a31..9e7dcbf 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -22,21 +22,27 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
+
/**
* Query request.
*/
@@ -65,6 +71,11 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
*/
public static final int FLAG_EXPLAIN = 1 << 3;
+ /**
+ * If it is a REPLICATED query.
+ */
+ public static final int FLAG_REPLICATED = 1 << 4;
+
/** */
private long reqId;
@@ -100,6 +111,34 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** */
private int timeout;
+ /** */
+ @GridToStringInclude(sensitive = true)
+ @GridDirectTransient
+ private Object[] params;
+
+ /** */
+ private byte[] paramsBytes;
+
+ /**
+ * @return Parameters.
+ */
+ public Object[] parameters() {
+ return params;
+ }
+
+ /**
+ * @param params Parameters.
+ * @return {@code this}.
+ */
+ public GridH2QueryRequest parameters(Object[] params) {
+ if (params == null)
+ params = EMPTY_PARAMS;
+
+ this.params = params;
+
+ return this;
+ }
+
/**
* @param tbls Tables.
* @return {@code this}.
@@ -258,20 +297,38 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public void marshall(Marshaller m) {
- if (F.isEmpty(qrys))
+ if (paramsBytes != null)
return;
- for (GridCacheSqlQuery qry : qrys)
- qry.marshall(m);
+ assert params != null;
+
+ try {
+ paramsBytes = U.marshal(m, params);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** {@inheritDoc} */
@Override public void unmarshall(Marshaller m, GridKernalContext ctx) {
- if (F.isEmpty(qrys))
+ if (params != null)
return;
- for (GridCacheSqlQuery qry : qrys)
- qry.unmarshall(m, ctx);
+ assert paramsBytes != null;
+
+ try {
+ final ClassLoader ldr = U.resolveClassLoader(ctx.config());
+
+ if (m instanceof BinaryMarshaller)
+ // To avoid deserializing of enum types.
+ params = ((BinaryMarshaller)m).binaryMarshaller().unmarshal(paramsBytes, ldr);
+ else
+ params = U.unmarshal(m, paramsBytes, ldr);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
}
/** {@inheritDoc} */
@@ -305,31 +362,31 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
writer.incrementState();
case 3:
- if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
+ if (!writer.writeByteArray("paramsBytes", paramsBytes))
return false;
writer.incrementState();
case 4:
- if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
+ if (!writer.writeMap("parts", parts, MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR))
return false;
writer.incrementState();
case 5:
- if (!writer.writeLong("reqId", reqId))
+ if (!writer.writeCollection("qrys", qrys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 6:
- if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
+ if (!writer.writeLong("reqId", reqId))
return false;
writer.incrementState();
case 7:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeCollection("tbls", tbls, MessageCollectionItemType.STRING))
return false;
writer.incrementState();
@@ -339,6 +396,13 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
return false;
writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -377,7 +441,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 3:
- parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
+ paramsBytes = reader.readByteArray("paramsBytes");
if (!reader.isLastRead())
return false;
@@ -385,7 +449,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 4:
- qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
+ parts = reader.readMap("parts", MessageCollectionItemType.UUID, MessageCollectionItemType.INT_ARR, false);
if (!reader.isLastRead())
return false;
@@ -393,7 +457,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 5:
- reqId = reader.readLong("reqId");
+ qrys = reader.readCollection("qrys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -401,7 +465,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 6:
- tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
+ reqId = reader.readLong("reqId");
if (!reader.isLastRead())
return false;
@@ -409,7 +473,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
reader.incrementState();
case 7:
- topVer = reader.readMessage("topVer");
+ tbls = reader.readCollection("tbls", MessageCollectionItemType.STRING);
if (!reader.isLastRead())
return false;
@@ -423,6 +487,15 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
return false;
reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridH2QueryRequest.class);
@@ -435,7 +508,7 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 9;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
index 1740fe9..5cb86b1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractFieldsQuerySelfTest.java
@@ -361,7 +361,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
*
*/
public void testExplain() {
- List<List<?>> res = grid(0).cache(personCache.getName()).query(new SqlFieldsQuery(
+ List<List<?>> res = grid(0).cache(personCache.getName()).query(sqlFieldsQuery(
String.format("explain select p.age, p.name, o.name " +
"from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
personCache.getName(), orgCache.getName()))).getAll();
@@ -369,7 +369,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
for (List<?> row : res)
X.println("____ : " + row);
- if (cacheMode() == PARTITIONED) {
+ if (cacheMode() == PARTITIONED || !isReplicatedOnly()) {
assertEquals(2, res.size());
assertTrue(((String)res.get(1).get(0)).contains(GridSqlQuerySplitter.mergeTableIdentifier(0)));
@@ -380,7 +380,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testExecuteWithMetaData() throws Exception {
- QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(new SqlFieldsQuery(
+ QueryCursorImpl<List<?>> cursor = (QueryCursorImpl<List<?>>)personCache.query(sqlFieldsQuery(
String.format("select p._KEY, p.name, p.age, o.name " +
"from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
personCache.getName(), orgCache.getName())));
@@ -480,7 +480,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testExecute() throws Exception {
- QueryCursor<List<?>> qry = personCache.query(new SqlFieldsQuery("select _KEY, name, age from Person"));
+ QueryCursor<List<?>> qry = personCache.query(sqlFieldsQuery("select _KEY, name, age from Person"));
List<List<?>> res = new ArrayList<>(qry.getAll());
@@ -526,7 +526,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testExecuteWithArguments() throws Exception {
QueryCursor<List<?>> qry = personCache
- .query(new SqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30));
+ .query(sqlFieldsQuery("select _KEY, name, age from Person where age > ?").setArgs(30));
List<List<?>> res = new ArrayList<>(qry.getAll());
@@ -564,10 +564,23 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
assert cnt == 2;
}
+ protected boolean isReplicatedOnly() {
+ return false;
+ }
+
+ private SqlFieldsQuery sqlFieldsQuery(String sql) {
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ if (isReplicatedOnly())
+ qry.setReplicatedOnly(true);
+
+ return qry;
+ }
+
/** @throws Exception If failed. */
public void testSelectAllJoined() throws Exception {
QueryCursor<List<?>> qry =
- personCache.query(new SqlFieldsQuery(
+ personCache.query(sqlFieldsQuery(
String.format("select * from \"%s\".Person p, \"%s\".Organization o where p.orgId = o.id",
personCache.getName(), orgCache.getName())));
@@ -631,7 +644,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testEmptyResult() throws Exception {
QueryCursor<List<?>> qry =
- personCache.query(new SqlFieldsQuery("select name from Person where age = 0"));
+ personCache.query(sqlFieldsQuery("select name from Person where age = 0"));
Collection<List<?>> res = qry.getAll();
@@ -641,7 +654,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testQueryString() throws Exception {
- QueryCursor<List<?>> qry = strCache.query(new SqlFieldsQuery("select * from String"));
+ QueryCursor<List<?>> qry = strCache.query(sqlFieldsQuery("select * from String"));
Collection<List<?>> res = qry.getAll();
@@ -658,7 +671,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testQueryIntegersWithJoin() throws Exception {
- QueryCursor<List<?>> qry = intCache.query(new SqlFieldsQuery(
+ QueryCursor<List<?>> qry = intCache.query(sqlFieldsQuery(
"select i._KEY, i._VAL, j._KEY, j._VAL from Integer i join Integer j where i._VAL >= 100"));
Collection<List<?>> res = qry.getAll();
@@ -682,7 +695,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
public void testPagination() throws Exception {
// Query with page size 20.
QueryCursor<List<?>> qry =
- intCache.query(new SqlFieldsQuery("select * from Integer").setPageSize(20));
+ intCache.query(sqlFieldsQuery("select * from Integer").setPageSize(20));
List<List<?>> res = new ArrayList<>(qry.getAll());
@@ -708,7 +721,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
for (int i = 0; i < 200; i++)
cache.put(i, i);
- QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery("select * from Integer"));
+ QueryCursor<List<?>> qry = cache.query(sqlFieldsQuery("select * from Integer"));
Collection<List<?>> res = qry.getAll();
@@ -729,7 +742,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
GridTestUtils.assertThrows(log, new Callable<Object>() {
@Override public Object call() throws Exception {
- return cache.query(new SqlFieldsQuery("select * from String"));
+ return cache.query(sqlFieldsQuery("select * from String"));
}
}, CacheException.class, null);
}
@@ -749,7 +762,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
cache.put(key, val);
- Collection<List<?>> res = cache.query(new SqlFieldsQuery("select * from Person")).getAll();
+ Collection<List<?>> res = cache.query(sqlFieldsQuery("select * from Person")).getAll();
assertEquals(1, res.size());
@@ -782,7 +795,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
*/
public void testPaginationIterator() throws Exception {
QueryCursor<List<?>> qry =
- intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+ intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
int cnt = 0;
@@ -802,7 +815,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testPaginationIteratorKeepAll() throws Exception {
QueryCursor<List<?>> qry =
- intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+ intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
int cnt = 0;
@@ -818,7 +831,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
assertEquals(size, cnt);
- qry = intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+ qry = intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
List<List<?>> list = new ArrayList<>(qry.getAll());
@@ -844,7 +857,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
public void testMethodAnnotationWithoutGet() throws Exception {
if (!binaryMarshaller) {
QueryCursor<List<?>> qry =
- orgCache.query(new SqlFieldsQuery("select methodField from Organization where methodField='name-A'")
+ orgCache.query(sqlFieldsQuery("select methodField from Organization where methodField='name-A'")
.setPageSize(10));
List<List<?>> flds = qry.getAll();
@@ -860,7 +873,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
*/
public void testPaginationGet() throws Exception {
QueryCursor<List<?>> qry =
- intCache.query(new SqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
+ intCache.query(sqlFieldsQuery("select _key, _val from Integer").setPageSize(10));
List<List<?>> list = new ArrayList<>(qry.getAll());
@@ -883,7 +896,7 @@ public abstract class IgniteCacheAbstractFieldsQuerySelfTest extends GridCommonA
/** @throws Exception If failed. */
public void testEmptyGrid() throws Exception {
QueryCursor<List<?>> qry = personCache
- .query(new SqlFieldsQuery("select name, age from Person where age = 25"));
+ .query(sqlFieldsQuery("select name, age from Person where age = 25"));
List<?> res = F.first(qry.getAll());
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index aa31f33..d4772c1 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -208,6 +208,16 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
"from \"orgRepl\".Organization o left join \"person\".Person p " +
"on (p.orgId = o.id)", orgCacheRepl, 2);
+ // Left join from replicated to partitioned cache is not supported:
+ // returns duplicates in result and must fail.
+ checkQueryFails("select o.name, p._key, p.name " +
+ "from \"person\".Person p left join \"org\".Organization o " +
+ "on (p.orgId = o.id)", orgCache);
+
+ checkQueryFails("select o.name, p._key, p.name " +
+ "from \"org\".Organization o right join \"person\".Person p " +
+ "on (p.orgId = o.id)", orgCache);
+
checkQueryFails("select o.name, p._key, p.name " +
"from \"person\".Person p left join \"org\".Organization o " +
"on (p.orgId = o.id)", personCache);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
new file mode 100644
index 0000000..44a68e2
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/IgniteCacheReplicatedFieldsQueryROSelfTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.replicated;
+
+/**
+ */
+public class IgniteCacheReplicatedFieldsQueryROSelfTest extends IgniteCacheReplicatedFieldsQuerySelfTest {
+ /** */
+ @Override protected boolean isReplicatedOnly() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ded599ae/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index f093ac7..b180eba 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
@@ -61,6 +62,9 @@ import org.springframework.util.StringUtils;
@SuppressWarnings("unchecked")
public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
/** */
+ private static final int CLIENT = 7;
+
+ /** */
private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
@@ -82,14 +86,16 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
return cfg;
}
- @Override
- protected long getTestTimeout() {
- return 100_000_000;
- }
-
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(3, false);
+ Ignition.setClientMode(true);
+ try {
+ startGrid(CLIENT);
+ }
+ finally {
+ Ignition.setClientMode(false);
+ }
}
/** {@inheritDoc} */
@@ -156,22 +162,69 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
/**
*/
- public void testReplicatedOnlyTables() {
- doTestReplicatedOnlyTables(1);
+ public void testReplicatedTablesUsingPartitionedCache() {
+ doTestReplicatedTablesUsingPartitionedCache(1, false, false);
+ }
+
+ /**
+ */
+ public void testReplicatedTablesUsingPartitionedCacheSegmented() {
+ doTestReplicatedTablesUsingPartitionedCache(5, false, false);
+ }
+
+ /**
+ */
+ public void testReplicatedTablesUsingPartitionedCacheClient() {
+ doTestReplicatedTablesUsingPartitionedCache(1, true, false);
+ }
+
+ /**
+ */
+ public void testReplicatedTablesUsingPartitionedCacheSegmentedClient() {
+ doTestReplicatedTablesUsingPartitionedCache(5, true, false);
}
/**
*/
- public void testReplicatedOnlyTablesSegmented() {
- doTestReplicatedOnlyTables(5);
+ public void testReplicatedTablesUsingPartitionedCacheRO() {
+ doTestReplicatedTablesUsingPartitionedCache(1, false, true);
}
/**
*/
- private void doTestReplicatedOnlyTables(int segments) {
- IgniteCache<Integer,Value> p = ignite(0).getOrCreateCache(cacheConfig("p", true,
+ public void testReplicatedTablesUsingPartitionedCacheSegmentedRO() {
+ doTestReplicatedTablesUsingPartitionedCache(5, false, true);
+ }
+
+ /**
+ */
+ public void testReplicatedTablesUsingPartitionedCacheClientRO() {
+ doTestReplicatedTablesUsingPartitionedCache(1, true, true);
+ }
+
+ /**
+ */
+ public void testReplicatedTablesUsingPartitionedCacheSegmentedClientRO() {
+ doTestReplicatedTablesUsingPartitionedCache(5, true, true);
+ }
+
+ /**
+ */
+ private SqlFieldsQuery query(String sql, boolean replicatedOnly) {
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ if (replicatedOnly)
+ qry.setReplicatedOnly(true);
+
+ return qry;
+ }
+
+ /**
+ */
+ private void doTestReplicatedTablesUsingPartitionedCache(int segments, boolean client, boolean replicatedOnlyFlag) {
+ IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
Integer.class, Value.class).setQueryParallelism(segments));
- IgniteCache<Integer,Value> r = ignite(0).getOrCreateCache(cacheConfig("r", false,
+ IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
Integer.class, Value.class));
try {
@@ -181,9 +234,53 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
r.put(i, new Value(i, -i));
// Query data from replicated table using partitioned cache.
- assertEquals(cnt, p.query(new SqlFieldsQuery("select 1 from \"r\".Value")).getAll().size());
+ assertEquals(cnt, p.query(query("select 1 from \"r\".Value", replicatedOnlyFlag))
+ .getAll().size());
+
+ List<List<?>> res = p.query(query("select count(1) from \"r\".Value", replicatedOnlyFlag)).getAll();
+ assertEquals(1, res.size());
+ assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
+ }
+ finally {
+ p.destroy();
+ r.destroy();
+ }
+ }
+
+ public void testPartitionedTablesUsingReplicatedCache() {
+ doTestPartitionedTablesUsingReplicatedCache(1, false);
+ }
+
+ public void testPartitionedTablesUsingReplicatedCacheSegmented() {
+ doTestPartitionedTablesUsingReplicatedCache(7, false);
+ }
+
+ public void testPartitionedTablesUsingReplicatedCacheClient() {
+ doTestPartitionedTablesUsingReplicatedCache(1, true);
+ }
+
+ public void testPartitionedTablesUsingReplicatedCacheSegmentedClient() {
+ doTestPartitionedTablesUsingReplicatedCache(7, true);
+ }
+
+ /**
+ */
+ private void doTestPartitionedTablesUsingReplicatedCache(int segments, boolean client) {
+ IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
+ Integer.class, Value.class).setQueryParallelism(segments));
+ IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
+ Integer.class, Value.class));
+
+ try {
+ int cnt = 1000;
+
+ for (int i = 0; i < cnt; i++)
+ p.put(i, new Value(i, -i));
+
+ // Query data from replicated table using partitioned cache.
+ assertEquals(cnt, r.query(new SqlFieldsQuery("select 1 from \"p\".Value")).getAll().size());
- List<List<?>> res = p.query(new SqlFieldsQuery("select count(1) from \"r\".Value")).getAll();
+ List<List<?>> res = r.query(new SqlFieldsQuery("select count(1) from \"p\".Value")).getAll();
assertEquals(1, res.size());
assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
}