You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:51:54 UTC
[10/50] [abbrv] ignite git commit: ignite-split2 - distributedJoins
ignite-split2 - distributedJoins
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a1d6079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a1d6079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a1d6079
Branch: refs/heads/ignite-1232
Commit: 7a1d6079abcf03361c35931ca8278969b2104df9
Parents: 71f394c
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Dec 7 07:58:49 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Dec 7 07:58:49 2015 +0300
----------------------------------------------------------------------
.../ignite/cache/query/SqlFieldsQuery.java | 24 ++++++
.../org/apache/ignite/cache/query/SqlQuery.java | 25 ++++++
.../cache/query/GridCacheTwoStepQuery.java | 28 ++++---
.../processors/query/h2/IgniteH2Indexing.java | 88 ++++++++++++++++++--
.../query/h2/sql/GridSqlQuerySplitter.java | 6 +-
.../h2/twostep/GridReduceQueryExecutor.java | 8 +-
.../query/IgniteSqlSplitterSelfTest.java | 6 +-
7 files changed, 161 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 65f8eba..48dab6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -59,6 +59,9 @@ public final class SqlFieldsQuery extends Query<List<?>> {
/** */
private boolean enforceJoinOrder;
+ /** */
+ private boolean distributedJoins;
+
/**
* Constructs SQL fields query.
*
@@ -170,6 +173,27 @@ public final class SqlFieldsQuery extends Query<List<?>> {
return this;
}
+ /**
+ * Specify if distributed joins are enabled for this query.
+ *
+ * @param distributedJoins Distributed joins enabled.
+ * @return {@code this} For chaining.
+ */
+ public SqlFieldsQuery setDistributedJoins(boolean distributedJoins) {
+ this.distributedJoins = distributedJoins;
+
+ return this;
+ }
+
+ /**
+ * Check if distributed joins are enabled for this query.
+ *
+ * @return {@code true} If distributed joind enabled.
+ */
+ public boolean isDistributedJoins() {
+ return distributedJoins;
+ }
+
/** {@inheritDoc} */
@Override public SqlFieldsQuery setPageSize(int pageSize) {
return (SqlFieldsQuery)super.setPageSize(pageSize);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index be3b390..e05ff13 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -43,6 +43,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
@GridToStringInclude
private Object[] args;
+ /** */
+ private boolean distributedJoins;
+
/**
* Constructs query for the given type name and SQL query.
*
@@ -142,11 +145,33 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
/**
* @param type Type.
+ * @return {@code this} For chaining.
*/
public SqlQuery setType(Class<?> type) {
return setType(GridQueryProcessor.typeName(type));
}
+ /**
+ * Specify if distributed joins are enabled for this query.
+ *
+ * @param distributedJoins Distributed joins enabled.
+ * @return {@code this} For chaining.
+ */
+ public SqlQuery setDistributedJoins(boolean distributedJoins) {
+ this.distributedJoins = distributedJoins;
+
+ return this;
+ }
+
+ /**
+ * Check if distributed joins are enabled for this query.
+ *
+ * @return {@code true} If distributed joind enabled.
+ */
+ public boolean isDistributedJoins() {
+ return distributedJoins;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SqlQuery.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 42cde28..7d7715a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -51,7 +51,7 @@ public class GridCacheTwoStepQuery {
private Set<String> tbls;
/** */
- private boolean fullCollocation;
+ private boolean distributedJoins;
/** */
private boolean skipMergeTbl;
@@ -66,11 +66,24 @@ public class GridCacheTwoStepQuery {
}
/**
- * @param fullCollocation If it is a collocated query and no distributed joins can occur.
+ * Specify if distributed joins are enabled for this query.
+ *
+ * @param distributedJoins Distributed joins enabled.
*/
- public void fullCollocation(boolean fullCollocation) {
- this.fullCollocation = fullCollocation;
+ public void distributedJoins(boolean distributedJoins) {
+ this.distributedJoins = distributedJoins;
}
+
+ /**
+ * Check if distributed joins are enabled for this query.
+ *
+ * @return {@code true} If distributed joind enabled.
+ */
+ public boolean distributedJoins() {
+ return distributedJoins;
+ }
+
+
/**
* @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
*/
@@ -86,13 +99,6 @@ public class GridCacheTwoStepQuery {
}
/**
- * @return {@code true} If it is a collocated query and no distributed joins can occur.
- */
- public boolean fullCollocation() {
- return fullCollocation;
- }
-
- /**
* @return If this is explain query.
*/
public boolean explain() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 802a0fa..067af47 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -105,7 +105,6 @@ import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -298,7 +297,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>();
/** */
- private final GridBoundedConcurrentLinkedHashMap<T3<String, String, Boolean>, TwoStepCachedQuery> twoStepCache =
+ private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache =
new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
/**
@@ -918,7 +917,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param filter Filter.
*/
private void initLocalQueryContext(IndexingQueryFilter filter) {
- GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter));
+ GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
}
/** {@inheritDoc} */
@@ -983,6 +982,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
fqry.setArgs(qry.getArgs());
fqry.setPageSize(qry.getPageSize());
+ fqry.setDistributedJoins(qry.isDistributedJoins());
final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
@@ -1051,11 +1051,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Connection c = connectionForSpace(space);
final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
+ final boolean distributedJoins = qry.isDistributedJoins();
+ final boolean groupByCollocated = qry.isCollocated();
GridCacheTwoStepQuery twoStepQry;
List<GridQueryFieldMetadata> meta;
- final T3<String, String, Boolean> cachedQryKey = new T3<>(space, sqlQry, qry.isCollocated());
+ final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, groupByCollocated,
+ distributedJoins, enforceJoinOrder);
TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
if (cachedQry != null) {
@@ -1067,7 +1070,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Session ses = session(c);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
- .distributedJoins(isPartitioned(cctx)));
+ .distributedJoins(distributedJoins && isPartitioned(cctx)));
enforceJoinOrder(enforceJoinOrder);
ses.setJoinBatchEnabled(false);
@@ -1088,7 +1091,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
try {
bindParameters(stmt, F.asList(qry.getArgs()));
- twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated());
+ twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated(),
+ distributedJoins);
meta = meta(stmt.getMetaData());
}
@@ -1758,6 +1762,78 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * Key for cached two-step query.
+ */
+ private static final class TwoStepCachedQueryKey {
+ /** */
+ private final String space;
+
+ /** */
+ private final String sql;
+
+ /** */
+ private final boolean groupByCollocated;
+
+ /** */
+ private final boolean distributedJoins;
+
+ /** */
+ private final boolean enforceJoinOrder;
+
+ /**
+ * @param space Space.
+ * @param sql Sql.
+ * @param groupByCollocated Collocated GROUP BY.
+ * @param distributedJoins Distributed joins enabled.
+ * @param enforceJoinOrder Enforce join order of tables.
+ */
+ private TwoStepCachedQueryKey(String space, String sql, boolean groupByCollocated, boolean distributedJoins,
+ boolean enforceJoinOrder) {
+ this.space = space;
+ this.sql = sql;
+ this.groupByCollocated = groupByCollocated;
+ this.distributedJoins = distributedJoins;
+ this.enforceJoinOrder = enforceJoinOrder;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o;
+
+ if (groupByCollocated != that.groupByCollocated)
+ return false;
+
+ if (distributedJoins != that.distributedJoins)
+ return false;
+
+ if (enforceJoinOrder != that.enforceJoinOrder)
+ return false;
+
+ if (space != null ? !space.equals(that.space) : that.space != null)
+ return false;
+
+ return sql.equals(that.sql);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int result = space != null ? space.hashCode() : 0;
+ result = 31 * result + sql.hashCode();
+ result = 31 * result + (groupByCollocated ? 1 : 0);
+ result = 31 * result + (distributedJoins ? 1 : 0);
+ result = 31 * result + (enforceJoinOrder ? 1 : 0);
+
+ return result;
+ }
+ }
+
+ /**
* Cached two-step query.
*/
private static final class TwoStepCachedQuery {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 0d41e95..1bdb26a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -144,12 +144,14 @@ public class GridSqlQuerySplitter {
* @param stmt Prepared statement.
* @param params Parameters.
* @param collocatedGroupBy Whether the query has collocated GROUP BY keys.
+ * @param distributedJoins If distributed joins enabled.
* @return Two step query.
*/
public static GridCacheTwoStepQuery split(
JdbcPreparedStatement stmt,
Object[] params,
- boolean collocatedGroupBy
+ final boolean collocatedGroupBy,
+ final boolean distributedJoins
) {
if (params == null)
params = GridCacheSqlQuery.EMPTY_PARAMS;
@@ -178,7 +180,7 @@ public class GridSqlQuerySplitter {
// We do not have to look at each map query separately here, because if
// the whole initial query is collocated, then all the map sub-queries
// will be collocated as well.
- res.fullCollocation(isCollocated(query(prepared)));
+ res.distributedJoins(distributedJoins && !isCollocated(query(prepared)));
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 134631c..f80539e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -610,8 +610,8 @@ public class GridReduceQueryExecutor {
boolean retry = false;
- final boolean oldStyle = true; oldNodesInTopology(); // TODO =
- final boolean distributedJoins = !qry.fullCollocation();
+ final boolean oldStyle = oldNodesInTopology();
+ final boolean distributedJoins = qry.distributedJoins();
if (send(nodes,
oldStyle ?
@@ -682,8 +682,8 @@ public class GridReduceQueryExecutor {
else {
UUID locNodeId = ctx.localNodeId();
- GridH2QueryContext.set(
- new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE).pageSize(r.pageSize));
+ GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
+ .pageSize(r.pageSize).distributedJoins(false));
h2.enforceJoinOrder(enforceJoinOrder);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 596e157..57df338 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -235,8 +236,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
c.put(key++, p);
}
+ X.println("Plan : " + c.query(new SqlFieldsQuery("explain select count(*) from Person p, Organization o " +
+ "where p.orgId = o._key").setDistributedJoins(true)).getAll());
+
assertEquals(15000L, c.query(new SqlFieldsQuery("select count(*) from Person p, Organization o " +
- "where p.orgId = o._key")).getAll().get(0).get(0));
+ "where p.orgId = o._key").setDistributedJoins(true)).getAll().get(0).get(0));
}
finally {
c.destroy();