You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/06/30 16:51:51 UTC
[1/2] ignite git commit: ignite-1232 Added tests.
Repository: ignite
Updated Branches:
refs/heads/ignite-1232 8ab0608cf -> 407b306d5
ignite-1232 Added tests.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbfef04a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbfef04a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbfef04a
Branch: refs/heads/ignite-1232
Commit: bbfef04a17fee5c0c3c7d0b29a4386c01ac91674
Parents: d51d8b2
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 30 19:43:58 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 30 19:51:09 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 1 -
.../messages/GridQueryNextPageResponse.java | 12 +-
.../junits/common/GridCommonAbstractTest.java | 16 +
.../processors/query/h2/IgniteH2Indexing.java | 31 +-
.../query/h2/opt/GridH2RowFactory.java | 8 +
.../query/h2/opt/GridH2TreeIndex.java | 28 +-
.../processors/query/h2/sql/GridSqlTable.java | 10 +-
.../h2/twostep/GridReduceQueryExecutor.java | 8 +-
.../query/h2/twostep/msg/GridH2Boolean.java | 1 +
.../query/h2/twostep/msg/GridH2RowRange.java | 8 +
.../IgniteCacheDistributedJoinQueryTest.java | 517 +++++++++++++++++++
.../cache/IgniteCacheJoinNoIndexTest.java | 248 +++++++++
...teCacheJoinPartitionedAndReplicatedTest.java | 314 +++++++++++
.../query/IgniteSqlSplitterSelfTest.java | 25 +
14 files changed, 1186 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 6b25649..8b0465f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -90,7 +90,6 @@ import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P_POOL;
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index b220291..087d5e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -55,6 +56,7 @@ public class GridQueryNextPageResponse implements Message {
/** */
@GridDirectCollection(Message.class)
+ @GridToStringInclude
private Collection<Message> vals;
/** */
@@ -144,11 +146,6 @@ public class GridQueryNextPageResponse implements Message {
}
/** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridQueryNextPageResponse.class, this);
- }
-
- /** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
@@ -304,4 +301,9 @@ public class GridQueryNextPageResponse implements Message {
public void retry(AffinityTopologyVersion retry) {
this.retry = retry;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridQueryNextPageResponse.class, this);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 6913539..6b60a4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import javax.cache.CacheException;
@@ -1169,4 +1170,19 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
}
}
+
+ /**
+ * @param aff Affinity.
+ * @param key Counter.
+ * @param node Target node.
+ * @return Key.
+ */
+ protected final Integer keyForNode(Affinity<Object> aff, AtomicInteger key, ClusterNode node) {
+ while (true) {
+ Integer next = key.getAndIncrement();
+
+ if (aff.mapKeyToNode(next).equals(node))
+ return next;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ed47d70..dc90df7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1096,12 +1096,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
final boolean distributedJoins = qry.isDistributedJoins() && isPartitioned(cctx);
- final boolean groupByCollocated = qry.isCollocated();
+ final boolean grpByCollocated = qry.isCollocated();
GridCacheTwoStepQuery twoStepQry;
List<GridQueryFieldMetadata> meta;
- final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, groupByCollocated,
+ final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, grpByCollocated,
distributedJoins, enforceJoinOrder);
TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
@@ -1152,7 +1152,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
bindParameters(stmt, F.asList(qry.getArgs()));
- twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), groupByCollocated,
+ twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), grpByCollocated,
distributedJoins);
// Setup spaces from schemas.
@@ -1961,7 +1961,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final String sql;
/** */
- private final boolean groupByCollocated;
+ private final boolean grpByCollocated;
/** */
private final boolean distributedJoins;
@@ -1972,15 +1972,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/**
* @param space Space.
* @param sql Sql.
- * @param groupByCollocated Collocated GROUP BY.
+ * @param grpByCollocated Collocated GROUP BY.
* @param distributedJoins Distributed joins enabled.
* @param enforceJoinOrder Enforce join order of tables.
*/
- private TwoStepCachedQueryKey(String space, String sql, boolean groupByCollocated, boolean distributedJoins,
+ private TwoStepCachedQueryKey(String space,
+ String sql,
+ boolean grpByCollocated,
+ boolean distributedJoins,
boolean enforceJoinOrder) {
this.space = space;
this.sql = sql;
- this.groupByCollocated = groupByCollocated;
+ this.grpByCollocated = grpByCollocated;
this.distributedJoins = distributedJoins;
this.enforceJoinOrder = enforceJoinOrder;
}
@@ -1995,7 +1998,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o;
- if (groupByCollocated != that.groupByCollocated)
+ if (grpByCollocated != that.grpByCollocated)
return false;
if (distributedJoins != that.distributedJoins)
@@ -2012,13 +2015,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public int hashCode() {
- int result = space != null ? space.hashCode() : 0;
- result = 31 * result + sql.hashCode();
- result = 31 * result + (groupByCollocated ? 1 : 0);
- result = 31 * result + (distributedJoins ? 1 : 0);
- result = 31 * result + (enforceJoinOrder ? 1 : 0);
+ int res = space != null ? space.hashCode() : 0;
+ res = 31 * res + sql.hashCode();
+ res = 31 * res + (grpByCollocated ? 1 : 0);
+ res = 31 * res + (distributedJoins ? 1 : 0);
+ res = 31 * res + (enforceJoinOrder ? 1 : 0);
- return result;
+ return res;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
index 148fab8..00ff3f2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowFactory.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.query.h2.opt;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.h2.result.RowFactory;
import org.h2.value.Value;
@@ -144,6 +146,7 @@ public class GridH2RowFactory extends RowFactory {
*/
private static final class RowSimple extends GridH2Row {
/** */
+ @GridToStringInclude
private Value[] vals;
/**
@@ -167,5 +170,10 @@ public class GridH2RowFactory extends RowFactory {
@Override public void setValue(int idx, Value v) {
vals[idx] = v;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RowSimple.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 31e9408..00606b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -115,7 +115,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
private final boolean snapshotEnabled;
/** */
- private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() {
+ private final CIX2<ClusterNode,Message> locNodeHnd = new CIX2<ClusterNode,Message>() {
@Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
onMessage0(clusterNode.id(), msg);
}
@@ -238,7 +238,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* @param msg Message.
*/
private void send(Collection<ClusterNode> nodes, Message msg) {
- if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler,
+ if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHnd,
GridIoPolicy.IDX_POOL, false))
throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
}
@@ -296,7 +296,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
if (msg.bounds() != null) {
// This is the first request containing all the search rows.
- ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
+ ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
assert !msg.bounds().isEmpty() : "empty bounds";
@@ -785,7 +785,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* @return Collection of nodes for broadcasting.
*/
private List<ClusterNode> broadcastNodes(GridH2QueryContext qctx, GridCacheContext<?,?> cctx) {
- Map<UUID,int[]> partMap = qctx.partitionsMap();
+ Map<UUID, int[]> partMap = qctx.partitionsMap();
List<ClusterNode> res;
@@ -1140,7 +1140,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
final GridCacheContext<?,?> cctx;
/** */
- final boolean unicast;
+ final boolean ucast;
/** */
final int affColId;
@@ -1168,12 +1168,12 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/**
* @param cctx Cache Cache context.
- * @param unicast Unicast or broadcast query.
+ * @param ucast Unicast or broadcast query.
* @param affColId Affinity column ID.
*/
- private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean unicast, int affColId) {
+ private DistributedLookupBatch(GridCacheContext<?,?> cctx, boolean ucast, int affColId) {
this.cctx = cctx;
- this.unicast = unicast;
+ this.ucast = ucast;
this.affColId = affColId;
}
@@ -1212,7 +1212,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
try {
pkAffKeyFirst = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject());
- pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkFirst.getObject());
+ pkAffKeyLast = ctx.affinity().affinityKey(cctx.name(), pkLast.getObject());
}
catch (IgniteCheckedException e) {
throw new CacheException(e);
@@ -1228,6 +1228,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/** {@inheritDoc} */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
@Override public boolean addSearchRows(SearchRow firstRow, SearchRow lastRow) {
if (findCalled) {
findCalled = false;
@@ -1313,6 +1314,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
return batchFull;
}
+ /**
+ *
+ */
private void startStreams() {
if (rangeStreams.isEmpty()) {
assert res.isEmpty();
@@ -1338,8 +1342,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/** {@inheritDoc} */
- @Override public void reset(boolean beforeQuery) {
- if (beforeQuery) {
+ @Override public void reset(boolean beforeQry) {
+ if (beforeQry) {
qctx = GridH2QueryContext.get();
batchLookupId = qctx.nextBatchLookupId();
rangeStreams = new HashMap<>();
@@ -1355,7 +1359,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** {@inheritDoc} */
@Override public String getPlanSQL() {
- return unicast ? "unicast" : "broadcast";
+ return ucast ? "unicast" : "broadcast";
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
index 59f14cf..49c679d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlTable.java
@@ -37,7 +37,7 @@ public class GridSqlTable extends GridSqlElement {
private final GridH2Table tbl;
/** */
- private boolean affKeyCondition;
+ private boolean affKeyCond;
/**
* @param schema Schema.
@@ -72,17 +72,17 @@ public class GridSqlTable extends GridSqlElement {
}
/**
- * @param affKeyCondition If affinity key condition is found.
+ * @param affKeyCond If affinity key condition is found.
*/
- public void affinityKeyCondition(boolean affKeyCondition) {
- this.affKeyCondition = affKeyCondition;
+ public void affinityKeyCondition(boolean affKeyCond) {
+ this.affKeyCond = affKeyCond;
}
/**
* @return {@code true} If affinity key condition is found.
*/
public boolean affinityKeyCondition() {
- return affKeyCondition;
+ return affKeyCond;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 530f3ad..da029c7 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -480,7 +480,7 @@ public class GridReduceQueryExecutor {
if (!F.isEmpty(m)) {
for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
- break;
+ break;
for (ClusterNode node : entry.getValue()) {
if (!node.isClient() && !node.isDaemon())
@@ -533,7 +533,7 @@ public class GridReduceQueryExecutor {
Collection<ClusterNode> nodes;
// Explicit partition mapping for unstable topology.
- Map<ClusterNode,IntArray> partsMap = null;
+ Map<ClusterNode, IntArray> partsMap = null;
if (isPreloadingActive(cctx, extraSpaces)) {
if (cctx.isReplicated())
@@ -1192,7 +1192,7 @@ public class GridReduceQueryExecutor {
* @param m Map.
* @return Converted map.
*/
- private static Map<UUID,int[]> convert(Map<ClusterNode,IntArray> m) {
+ private static Map<UUID,int[]> convert(Map<ClusterNode, IntArray> m) {
if (m == null)
return null;
@@ -1209,7 +1209,7 @@ public class GridReduceQueryExecutor {
* @param qry Query.
* @param explain Explain.
* @return Table.
- * @throws IgniteCheckedException
+ * @throws IgniteCheckedException If failed.
*/
private GridMergeTable createMergeTable(JdbcConnection conn, GridCacheSqlQuery qry, boolean explain)
throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
index 5681a66..edd404e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2Boolean.java
@@ -107,6 +107,7 @@ public class GridH2Boolean extends GridH2ValueMessage {
return -5;
}
+ /** {@inheritDoc} */
@Override public byte fieldsCount() {
return 1;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
index 374e4b8..6ebc11d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2RowRange.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.query.h2.twostep.msg;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -37,6 +39,7 @@ public class GridH2RowRange implements Message {
/** */
@GridDirectCollection(Message.class)
+ @GridToStringInclude
private List<GridH2RowMessage> rows;
/** */
@@ -170,4 +173,9 @@ public class GridH2RowRange implements Message {
@Override public void onAckReceived() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridH2RowRange.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
new file mode 100644
index 0000000..700a2c4
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinQueryTest.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheDistributedJoinQueryTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String PERSON_CACHE = "person";
+
+ /** */
+ private static final String ORG_CACHE = "org";
+
+ /** */
+ private boolean client;
+
+ /** */
+ private int total;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = ((TcpDiscoverySpi) cfg.getDiscoverySpi());
+
+ spi.setIpFinder(IP_FINDER);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(2);
+
+ client = true;
+
+ startGrid(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery1() throws Exception {
+ Ignite client = grid(2);
+
+ try {
+ CacheConfiguration ccfg1 =
+ cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+ CacheConfiguration ccfg2 =
+ cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+ IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+ IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+ List<Integer> orgIds = putData1();
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key", pCache, total);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and o._key=" + orgIds.get(3), pCache, 3);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and o._key > " + orgIds.get(2), pCache, total - 3);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and o._key > " + orgIds.get(1) + " and o._key < " + orgIds.get(4), pCache, 5);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.name = o.name", pCache, total);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.name = o.name and o._key=" + orgIds.get(0), pCache, 0);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.name = o.name and o._key=" + orgIds.get(3), pCache, 3);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.name = o.name and o._key IN (" + orgIds.get(2) + "," + orgIds.get(3) + ")", pCache, 5);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.name = o.name and o.name='obj-" + orgIds.get(3) + "'", pCache, 3);
+ }
+ finally {
+ client.destroyCache(PERSON_CACHE);
+ client.destroyCache(ORG_CACHE);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery2() throws Exception {
+ Ignite client = grid(2);
+
+ try {
+ CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, true)));
+ CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+ IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+ IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ AtomicInteger orgKey = new AtomicInteger();
+ AtomicInteger pKey = new AtomicInteger();
+
+ List<Integer> pIds = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) {
+ Integer orgId = keyForNode(aff, orgKey, node0);
+
+ orgCache.put(orgId, new Organization("org-" + orgId));
+
+ Integer pId = keyForNode(aff, pKey, node1);
+
+ pCache.put(pId, new Person(orgId, "p-" + orgId));
+
+ pIds.add(pId);
+ }
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and p._key >= 0", pCache, 3);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and p._key=" + pIds.get(0), pCache, 1);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId = o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 2);
+ }
+ finally {
+ client.destroyCache(PERSON_CACHE);
+ client.destroyCache(ORG_CACHE);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery3() throws Exception {
+ Ignite client = grid(2);
+
+ try {
+ CacheConfiguration ccfg1 = cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+ CacheConfiguration ccfg2 = cacheConfiguration(ORG_CACHE).setQueryEntities(F.asList(organizationEntity(false)));
+
+ IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+ IgniteCache<Object, Object> orgCache = client.createCache(ccfg2);
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ AtomicInteger orgKey = new AtomicInteger();
+ AtomicInteger pKey = new AtomicInteger();
+
+ List<Integer> pIds = new ArrayList<>();
+
+ for (int i = 0; i < 3; i++) {
+ Integer orgId = keyForNode(aff, orgKey, node0);
+
+ orgCache.put(orgId, new Organization("org-" + orgId));
+
+ Integer pId = keyForNode(aff, pKey, node1);
+
+ pCache.put(pId, new Person(orgId + 100_000, "p-" + orgId));
+
+ pIds.add(pId);
+ }
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId != o._key", pCache, 9);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId != o._key and p._key=" + pIds.get(0), pCache, 3);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId != o._key and p._key in (" + pIds.get(0) + ", " + pIds.get(1) + ")", pCache, 6);
+
+ checkQuery("select o._key, o.name, p._key, p.name " +
+ "from \"org\".Organization o, Person p " +
+ "where p.orgId != o._key and p._key >=" + pIds.get(0) + "and p._key <= " + pIds.get(2), pCache, 9);
+ }
+ finally {
+ client.destroyCache(PERSON_CACHE);
+ client.destroyCache(ORG_CACHE);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoinQuery4() throws Exception {
+ Ignite client = grid(2);
+
+ try {
+ CacheConfiguration ccfg1 =
+ cacheConfiguration(PERSON_CACHE).setQueryEntities(F.asList(personEntity(false, false)));
+
+ IgniteCache<Object, Object> pCache = client.createCache(ccfg1);
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ AtomicInteger pKey = new AtomicInteger();
+
+ Integer pId0 = keyForNode(aff, pKey, node0);
+
+ pCache.put(pId0, new Person(0, "p0"));
+
+ for (int i = 0; i < 3; i++) {
+ Integer pId = keyForNode(aff, pKey, node1);
+
+ pCache.put(pId, new Person(0, "p"));
+ }
+
+ checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+ "from Person p1, Person p2 " +
+ "where p2._key > p1._key", pCache, 6);
+
+ checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+ "from Person p1, Person p2 " +
+ "where p2._key > p1._key and p1._key=" + pId0, pCache, 3);
+
+ checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+ "from Person p1, Person p2 " +
+ "where p2._key > p1._key and p1.name='p0'", pCache, 3);
+
+ checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+ "from Person p1, Person p2 " +
+ "where p1.name != p2.name", pCache, 6);
+
+ checkQuery("select p1._key, p1.name, p2._key, p2.name " +
+ "from Person p1, Person p2 " +
+ "where p1.name > p2.name", pCache, 3);
+ }
+ finally {
+ client.destroyCache(PERSON_CACHE);
+ client.destroyCache(ORG_CACHE);
+ }
+ }
+
+ /**
+ * @param sql SQL.
+ * @param cache Cache.
+ * @param expSize Expected results size.
+ * @param args Arguments.
+ */
+ private void checkQuery(String sql, IgniteCache<Object, Object> cache, int expSize, Object... args) {
+ log.info("Execute query: " + sql);
+
+ checkQuery(sql, cache, false, expSize, args);
+
+ checkQuery(sql, cache, true, expSize, args);
+ }
+
+ /**
+ * @param sql SQL.
+ * @param cache Cache.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param expSize Expected results size.
+ * @param args Arguments.
+ */
+ private void checkQuery(String sql,
+ IgniteCache<Object, Object> cache,
+ boolean enforceJoinOrder,
+ int expSize,
+ Object... args) {
+ String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+ .setDistributedJoins(true)
+ .setEnforceJoinOrder(enforceJoinOrder))
+ .getAll().get(0).get(0);
+
+ log.info("Plan: " + plan);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ qry.setDistributedJoins(true);
+ qry.setEnforceJoinOrder(enforceJoinOrder);
+ qry.setArgs(args);
+
+ QueryCursor<List<?>> cur = cache.query(qry);
+
+ List<List<?>> res = cur.getAll();
+
+ if (expSize != res.size())
+ log.info("Results: " + res);
+
+ assertEquals(expSize, res.size());
+ }
+
+ /**
+ * @param idxName Name index flag.
+ * @param idxOrgId Org ID index flag.
+ * @return Entity.
+ */
+ private QueryEntity personEntity(boolean idxName, boolean idxOrgId) {
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Person.class.getName());
+
+ entity.addQueryField("orgId", Integer.class.getName(), null);
+ entity.addQueryField("name", String.class.getName(), null);
+
+ List<QueryIndex> idxs = new ArrayList<>();
+
+ if (idxName) {
+ QueryIndex idx = new QueryIndex("name");
+
+ idxs.add(idx);
+ }
+
+ if (idxOrgId) {
+ QueryIndex idx = new QueryIndex("orgId");
+
+ idxs.add(idx);
+ }
+
+ entity.setIndexes(idxs);
+
+ return entity;
+ }
+
+ /**
+ * @param idxName Name index flag.
+ * @return Entity.
+ */
+ private QueryEntity organizationEntity(boolean idxName) {
+ QueryEntity entity = new QueryEntity();
+
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Organization.class.getName());
+
+ entity.addQueryField("name", String.class.getName(), null);
+
+ if (idxName) {
+ QueryIndex idx = new QueryIndex("name");
+
+ entity.setIndexes(F.asList(idx));
+ }
+
+ return entity;
+ }
+
+ /**
+ * @return Organization ids.
+ */
+ private List<Integer> putData1() {
+ total = 0;
+
+ Ignite client = grid(2);
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+ IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+ AtomicInteger pKey = new AtomicInteger();
+ AtomicInteger orgKey = new AtomicInteger();
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ List<Integer> data = new ArrayList<>();
+
+ for (int i = 0; i < 5; i++) {
+ int orgId = keyForNode(aff, orgKey, node0);
+
+ orgCache.put(orgId, new Organization("obj-" + orgId));
+
+ for (int j = 0; j < i; j++) {
+ personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "obj-" + orgId));
+
+ total++;
+ }
+
+ data.add(orgId);
+ }
+
+ return data;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setBackups(0);
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ int orgId;
+
+ /** */
+ String name;
+
+ /**
+ * @param orgId Organization ID.
+ * @param name Name.
+ */
+ public Person(int orgId, String name) {
+ this.orgId = orgId;
+ this.name = name;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ String name;
+
+ /**
+ * @param name Name.
+ */
+ public Organization(String name) {
+ this.name = name;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
new file mode 100644
index 0000000..e8b363f
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinNoIndexTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheJoinNoIndexTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String PERSON_CACHE = "person";
+
+ /** */
+ private static final String ORG_CACHE = "org";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+ spi.setIpFinder(IP_FINDER);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ {
+ CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Person.class.getName());
+ entity.addQueryField("orgId", Integer.class.getName(), null);
+ entity.addQueryField("orgName", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(entity));
+
+ ccfgs.add(ccfg);
+ }
+
+ {
+ CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Organization.class.getName());
+ entity.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(entity));
+
+ ccfgs.add(ccfg);
+ }
+
+ cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration configuration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setBackups(0);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(2);
+
+ client = true;
+
+ startGrid(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoin() throws Exception {
+ Ignite client = grid(2);
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+ IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+
+ AtomicInteger pKey = new AtomicInteger(100_000);
+ AtomicInteger orgKey = new AtomicInteger();
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ for (int i = 0; i < 3; i++) {
+ int orgId = keyForNode(aff, orgKey, node0);
+
+ orgCache.put(orgId, new Organization("org-" + i));
+
+ for (int j = 0; j < i; j++)
+ personCache.put(keyForNode(aff, pKey, node1), new Person(orgId, "org-" + i));
+ }
+
+ checkQuery("select o.name, p._key, p.orgName " +
+ "from \"org\".Organization o, \"person\".Person p " +
+ "where p.orgName = o.name", personCache, false, 3);
+ }
+
+ /**
+ * @param sql SQL.
+ * @param cache Cache.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param expSize Expected results size.
+ * @param args Arguments.
+ */
+ private void checkQuery(String sql,
+ IgniteCache<Object, Object> cache,
+ boolean enforceJoinOrder,
+ int expSize,
+ Object... args) {
+ String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+ .setDistributedJoins(true)
+ .setEnforceJoinOrder(enforceJoinOrder))
+ .getAll().get(0).get(0);
+
+ log.info("Plan: " + plan);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ qry.setDistributedJoins(true);
+ qry.setEnforceJoinOrder(enforceJoinOrder);
+ qry.setArgs(args);
+
+ QueryCursor<List<?>> cur = cache.query(qry);
+
+ List<List<?>> res = cur.getAll();
+
+ if (expSize != res.size())
+ log.info("Results: " + res);
+
+ assertEquals(expSize, res.size());
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ int orgId;
+
+ /** */
+ String orgName;
+
+ /**
+ * @param orgId Organization ID.
+ * @param orgName Organization name.
+ */
+ public Person(int orgId, String orgName) {
+ this.orgId = orgId;
+ this.orgName = orgName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ String name;
+
+ /**
+ * @param name Name.
+ */
+ public Organization(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Organization.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
new file mode 100644
index 0000000..92d4f54
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String PERSON_CACHE = "person";
+
+ /** */
+ private static final String ORG_CACHE = "org";
+
+ /** */
+ private static final String ACCOUNT_CACHE = "acc";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = ((TcpDiscoverySpi)cfg.getDiscoverySpi());
+
+ spi.setIpFinder(IP_FINDER);
+
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ {
+ CacheConfiguration ccfg = configuration(PERSON_CACHE);
+
+ // One cache is replicated.
+ ccfg.setCacheMode(REPLICATED);
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Person.class.getName());
+ entity.addQueryField("orgId", Integer.class.getName(), null);
+ entity.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(entity));
+
+ ccfgs.add(ccfg);
+ }
+
+ {
+ CacheConfiguration ccfg = configuration(ORG_CACHE);
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Organization.class.getName());
+ entity.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(entity));
+
+ ccfgs.add(ccfg);
+ }
+
+ {
+ CacheConfiguration ccfg = configuration(ACCOUNT_CACHE);
+
+ QueryEntity entity = new QueryEntity();
+ entity.setKeyType(Integer.class.getName());
+ entity.setValueType(Account.class.getName());
+ entity.addQueryField("orgId", Integer.class.getName(), null);
+ entity.addQueryField("personId", Integer.class.getName(), null);
+ entity.addQueryField("name", String.class.getName(), null);
+
+ ccfg.setQueryEntities(F.asList(entity));
+
+ ccfgs.add(ccfg);
+ }
+
+ cfg.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration configuration(String name) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
+ ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(2);
+
+ client = true;
+
+ startGrid(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJoin() throws Exception {
+ Ignite client = grid(2);
+
+ Affinity<Object> aff = client.affinity(PERSON_CACHE);
+
+ IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
+ IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
+ IgniteCache<Object, Object> accCache = client.cache(ACCOUNT_CACHE);
+
+ AtomicInteger pKey = new AtomicInteger(100_000);
+ AtomicInteger orgKey = new AtomicInteger();
+ AtomicInteger accKey = new AtomicInteger();
+
+ ClusterNode node0 = ignite(0).cluster().localNode();
+ ClusterNode node1 = ignite(1).cluster().localNode();
+
+ /**
+ * One organization, two persons, two accounts.
+ */
+
+ int orgId1 = keyForNode(aff, orgKey, node0);
+
+ orgCache.put(orgId1, new Organization("obj-" + orgId1));
+
+ int pid = keyForNode(aff, pKey, node0);
+ personCache.put(pid, new Person(orgId1, "o1-p1"));
+
+ accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1));
+ accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1));
+
+ checkQuery("select o.name, p._key, p.name, a.name " +
+ "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " +
+ "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+
+ checkQuery("select o.name, p._key, p.name, a.name " +
+ "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " +
+ "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+
+ checkQuery("select o.name, p._key, p.name, a.name " +
+ "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " +
+ "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2);
+ }
+
+ /**
+ * @param sql SQL.
+ * @param cache Cache.
+ * @param enforceJoinOrder Enforce join order flag.
+ * @param expSize Expected results size.
+ * @param args Arguments.
+ */
+ private void checkQuery(String sql,
+ IgniteCache<Object, Object> cache,
+ boolean enforceJoinOrder,
+ int expSize,
+ Object... args) {
+ String plan = (String)cache.query(new SqlFieldsQuery("explain " + sql)
+ .setDistributedJoins(true)
+ .setEnforceJoinOrder(enforceJoinOrder))
+ .getAll().get(0).get(0);
+
+ log.info("Plan: " + plan);
+
+ SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+ qry.setDistributedJoins(true);
+ qry.setEnforceJoinOrder(enforceJoinOrder);
+ qry.setArgs(args);
+
+ QueryCursor<List<?>> cur = cache.query(qry);
+
+ List<List<?>> res = cur.getAll();
+
+ if (expSize != res.size())
+ log.info("Results: " + res);
+
+ assertEquals(expSize, res.size());
+ }
+
+ /**
+ *
+ */
+ private static class Account implements Serializable {
+ /** */
+ int personId;
+
+ /** */
+ int orgId;
+
+ /** */
+ String name;
+
+ /**
+ * @param personId Person ID.
+ * @param orgId Organization ID.
+ */
+ public Account(int personId, int orgId) {
+ this.personId = personId;
+ this.orgId = orgId;
+ name = "acc-" + personId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Account.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Person implements Serializable {
+ /** */
+ int orgId;
+
+ /** */
+ String name;
+
+ /**
+ * @param orgId Organization ID.
+ * @param name Name.
+ */
+ public Person(int orgId, String name) {
+ this.orgId = orgId;
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Person.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class Organization implements Serializable {
+ /** */
+ String name;
+
+ /**
+ * @param name Name.
+ */
+ public Organization(String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(Organization.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/bbfef04a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index ce4d24b..6587631 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -475,9 +475,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
* Test value.
*/
private static class GroupIndexTestValue implements Serializable {
+ /** */
@QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 0))
private int a;
+ /** */
@QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 1))
private int b;
@@ -491,36 +493,59 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
}
}
+ /**
+ *
+ */
private static class Person2 implements Serializable {
+ /** */
@QuerySqlField
int orgId;
+ /** */
@QuerySqlField
String name;
}
+ /**
+ *
+ */
private static class Organization implements Serializable {
+ /** */
@QuerySqlField
String name;
}
+ /**
+ *
+ */
private static class User implements Serializable {
+ /** */
@QuerySqlField
private int id;
}
+ /**
+ *
+ */
private static class UserOrder implements Serializable {
+ /** */
@QuerySqlField
private int id;
+ /** */
@QuerySqlField
private int userId;
}
+ /**
+ *
+ */
private static class OrderGood implements Serializable {
+ /** */
@QuerySqlField
private int orderId;
+ /** */
@QuerySqlField
private int goodId;
}
[2/2] ignite git commit: Merge remote-tracking branch
'origin/ignite-1232' into ignite-1232
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-1232' into ignite-1232
# Conflicts:
# modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/407b306d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/407b306d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/407b306d
Branch: refs/heads/ignite-1232
Commit: 407b306d5a6e7b59f2285feaeb2364f9bef31f32
Parents: bbfef04 8ab0608
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 30 19:51:34 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 30 19:51:34 2016 +0300
----------------------------------------------------------------------
----------------------------------------------------------------------