You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:51:54 UTC

[10/50] [abbrv] ignite git commit: ignite-split2 - distributedJoins

ignite-split2 - distributedJoins


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a1d6079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a1d6079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a1d6079

Branch: refs/heads/ignite-1232
Commit: 7a1d6079abcf03361c35931ca8278969b2104df9
Parents: 71f394c
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Dec 7 07:58:49 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Dec 7 07:58:49 2015 +0300

----------------------------------------------------------------------
 .../ignite/cache/query/SqlFieldsQuery.java      | 24 ++++++
 .../org/apache/ignite/cache/query/SqlQuery.java | 25 ++++++
 .../cache/query/GridCacheTwoStepQuery.java      | 28 ++++---
 .../processors/query/h2/IgniteH2Indexing.java   | 88 ++++++++++++++++++--
 .../query/h2/sql/GridSqlQuerySplitter.java      |  6 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  8 +-
 .../query/IgniteSqlSplitterSelfTest.java        |  6 +-
 7 files changed, 161 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
index 65f8eba..48dab6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlFieldsQuery.java
@@ -59,6 +59,9 @@ public final class SqlFieldsQuery extends Query<List<?>> {
     /** */
     private boolean enforceJoinOrder;
 
+    /** */
+    private boolean distributedJoins;
+
     /**
      * Constructs SQL fields query.
      *
@@ -170,6 +173,27 @@ public final class SqlFieldsQuery extends Query<List<?>> {
         return this;
     }
 
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     * @return {@code this} For chaining.
+     */
+    public SqlFieldsQuery setDistributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+
+        return this;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
     /** {@inheritDoc} */
     @Override public SqlFieldsQuery setPageSize(int pageSize) {
         return (SqlFieldsQuery)super.setPageSize(pageSize);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
index be3b390..e05ff13 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/SqlQuery.java
@@ -43,6 +43,9 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
     @GridToStringInclude
     private Object[] args;
 
+    /** */
+    private boolean distributedJoins;
+
     /**
      * Constructs query for the given type name and SQL query.
      *
@@ -142,11 +145,33 @@ public final class SqlQuery<K, V> extends Query<Cache.Entry<K, V>> {
 
     /**
      * @param type Type.
+     * @return {@code this} For chaining.
      */
     public SqlQuery setType(Class<?> type) {
         return setType(GridQueryProcessor.typeName(type));
     }
 
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     * @return {@code this} For chaining.
+     */
+    public SqlQuery setDistributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+
+        return this;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean isDistributedJoins() {
+        return distributedJoins;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(SqlQuery.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index 42cde28..7d7715a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -51,7 +51,7 @@ public class GridCacheTwoStepQuery {
     private Set<String> tbls;
 
     /** */
-    private boolean fullCollocation;
+    private boolean distributedJoins;
 
     /** */
     private boolean skipMergeTbl;
@@ -66,11 +66,24 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @param fullCollocation If it is a collocated query and no distributed joins can occur.
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
      */
-    public void fullCollocation(boolean fullCollocation) {
-        this.fullCollocation = fullCollocation;
+    public void distributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
     }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+
     /**
      * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
      */
@@ -86,13 +99,6 @@ public class GridCacheTwoStepQuery {
     }
 
     /**
-     * @return {@code true} If it is a collocated query and no distributed joins can occur.
-     */
-    public boolean fullCollocation() {
-        return fullCollocation;
-    }
-
-    /**
      * @return If this is explain query.
      */
     public boolean explain() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 802a0fa..067af47 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -105,7 +105,6 @@ import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -298,7 +297,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final ConcurrentHashMap<Thread, StatementCache> stmtCache = new ConcurrentHashMap<>();
 
     /** */
-    private final GridBoundedConcurrentLinkedHashMap<T3<String, String, Boolean>, TwoStepCachedQuery> twoStepCache =
+    private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache =
         new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
 
     /**
@@ -918,7 +917,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param filter Filter.
      */
     private void initLocalQueryContext(IndexingQueryFilter filter) {
-        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter));
+        GridH2QueryContext.set(new GridH2QueryContext(nodeId, nodeId, 0, LOCAL).filter(filter).distributedJoins(false));
     }
 
     /** {@inheritDoc} */
@@ -983,6 +982,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         fqry.setArgs(qry.getArgs());
         fqry.setPageSize(qry.getPageSize());
+        fqry.setDistributedJoins(qry.isDistributedJoins());
 
         final QueryCursor<List<?>> res = queryTwoStep(cctx, fqry);
 
@@ -1051,11 +1051,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
         Connection c = connectionForSpace(space);
         final boolean enforceJoinOrder = qry.isEnforceJoinOrder();
+        final boolean distributedJoins = qry.isDistributedJoins();
+        final boolean groupByCollocated = qry.isCollocated();
 
         GridCacheTwoStepQuery twoStepQry;
         List<GridQueryFieldMetadata> meta;
 
-        final T3<String, String, Boolean> cachedQryKey = new T3<>(space, sqlQry, qry.isCollocated());
+        final TwoStepCachedQueryKey cachedQryKey = new TwoStepCachedQueryKey(space, sqlQry, groupByCollocated,
+            distributedJoins, enforceJoinOrder);
         TwoStepCachedQuery cachedQry = twoStepCache.get(cachedQryKey);
 
         if (cachedQry != null) {
@@ -1067,7 +1070,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             Session ses = session(c);
 
             GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, 0, PREPARE)
-                .distributedJoins(isPartitioned(cctx)));
+                .distributedJoins(distributedJoins && isPartitioned(cctx)));
             enforceJoinOrder(enforceJoinOrder);
             ses.setJoinBatchEnabled(false);
 
@@ -1088,7 +1091,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             try {
                 bindParameters(stmt, F.asList(qry.getArgs()));
 
-                twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated());
+                twoStepQry = GridSqlQuerySplitter.split((JdbcPreparedStatement)stmt, qry.getArgs(), qry.isCollocated(),
+                    distributedJoins);
 
                 meta = meta(stmt.getMetaData());
             }
@@ -1758,6 +1762,78 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Key for cached two-step query.
+     */
+    private static final class TwoStepCachedQueryKey {
+        /** */
+        private final String space;
+
+        /** */
+        private final String sql;
+
+        /** */
+        private final boolean groupByCollocated;
+
+        /** */
+        private final boolean distributedJoins;
+
+        /** */
+        private final boolean enforceJoinOrder;
+
+        /**
+         * @param space Space.
+         * @param sql Sql.
+         * @param groupByCollocated Collocated GROUP BY.
+         * @param distributedJoins Distributed joins enabled.
+         * @param enforceJoinOrder Enforce join order of tables.
+         */
+        private TwoStepCachedQueryKey(String space, String sql, boolean groupByCollocated, boolean distributedJoins,
+            boolean enforceJoinOrder) {
+            this.space = space;
+            this.sql = sql;
+            this.groupByCollocated = groupByCollocated;
+            this.distributedJoins = distributedJoins;
+            this.enforceJoinOrder = enforceJoinOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TwoStepCachedQueryKey that = (TwoStepCachedQueryKey)o;
+
+            if (groupByCollocated != that.groupByCollocated)
+                return false;
+
+            if (distributedJoins != that.distributedJoins)
+                return false;
+
+            if (enforceJoinOrder != that.enforceJoinOrder)
+                return false;
+
+            if (space != null ? !space.equals(that.space) : that.space != null)
+                return false;
+
+            return sql.equals(that.sql);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int result = space != null ? space.hashCode() : 0;
+            result = 31 * result + sql.hashCode();
+            result = 31 * result + (groupByCollocated ? 1 : 0);
+            result = 31 * result + (distributedJoins ? 1 : 0);
+            result = 31 * result + (enforceJoinOrder ? 1 : 0);
+
+            return result;
+        }
+    }
+
+    /**
      * Cached two-step query.
      */
     private static final class TwoStepCachedQuery {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 0d41e95..1bdb26a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -144,12 +144,14 @@ public class GridSqlQuerySplitter {
      * @param stmt Prepared statement.
      * @param params Parameters.
      * @param collocatedGroupBy Whether the query has collocated GROUP BY keys.
+     * @param distributedJoins If distributed joins enabled.
      * @return Two step query.
      */
     public static GridCacheTwoStepQuery split(
         JdbcPreparedStatement stmt,
         Object[] params,
-        boolean collocatedGroupBy
+        final boolean collocatedGroupBy,
+        final boolean distributedJoins
     ) {
         if (params == null)
             params = GridCacheSqlQuery.EMPTY_PARAMS;
@@ -178,7 +180,7 @@ public class GridSqlQuerySplitter {
         // We do not have to look at each map query separately here, because if
         // the whole initial query is collocated, then all the map sub-queries
         // will be collocated as well.
-        res.fullCollocation(isCollocated(query(prepared)));
+        res.distributedJoins(distributedJoins && !isCollocated(query(prepared)));
 
         return res;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 134631c..f80539e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -610,8 +610,8 @@ public class GridReduceQueryExecutor {
 
                 boolean retry = false;
 
-                final boolean oldStyle = true; oldNodesInTopology(); // TODO =
-                final boolean distributedJoins = !qry.fullCollocation();
+                final boolean oldStyle = oldNodesInTopology();
+                final boolean distributedJoins = qry.distributedJoins();
 
                 if (send(nodes,
                     oldStyle ?
@@ -682,8 +682,8 @@ public class GridReduceQueryExecutor {
                     else {
                         UUID locNodeId = ctx.localNodeId();
 
-                        GridH2QueryContext.set(
-                            new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE).pageSize(r.pageSize));
+                        GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
+                            .pageSize(r.pageSize).distributedJoins(false));
 
                         h2.enforceJoinOrder(enforceJoinOrder);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/7a1d6079/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
index 596e157..57df338 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -235,8 +236,11 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest {
                 c.put(key++, p);
             }
 
+            X.println("Plan : " + c.query(new SqlFieldsQuery("explain select count(*) from Person p, Organization o " +
+                "where p.orgId = o._key").setDistributedJoins(true)).getAll());
+
             assertEquals(15000L, c.query(new SqlFieldsQuery("select count(*) from Person p, Organization o " +
-                "where p.orgId = o._key")).getAll().get(0).get(0));
+                "where p.orgId = o._key").setDistributedJoins(true)).getAll().get(0).get(0));
         }
         finally {
             c.destroy();