You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:26 UTC

[42/50] [abbrv] ignite git commit: ignite-split2 - optimizations

ignite-split2 - optimizations


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd5af493
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd5af493
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd5af493

Branch: refs/heads/ignite-1232
Commit: bd5af493a84d373a81188af43f1fb3a6071bdfd7
Parents: e0c86cd
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Dec 29 16:22:08 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Dec 29 16:22:08 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMergeIndex.java        | 24 +++++---
 .../h2/twostep/GridReduceQueryExecutor.java     | 63 +++++++++++---------
 2 files changed, 52 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bd5af493/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 2f06ac3..d3537da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -18,16 +18,19 @@
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
 import org.h2.engine.Session;
 import org.h2.index.BaseIndex;
@@ -40,7 +43,6 @@ import org.h2.result.SortOrder;
 import org.h2.table.IndexColumn;
 import org.h2.table.TableFilter;
 import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -56,7 +58,7 @@ public abstract class GridMergeIndex extends BaseIndex {
     private final AtomicInteger expRowsCnt = new AtomicInteger(0);
 
     /** Remaining rows per source node ID. */
-    private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
+    private Map<UUID, Counter> remainingRows;
 
     /** */
     private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -112,11 +114,19 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
-     * @param nodeId Node ID.
+     * Set source nodes.
+     *
+     * @param nodes Nodes.
      */
-    public void addSource(UUID nodeId) {
-        if (remainingRows.put(nodeId, new Counter()) != null)
-            throw new IllegalStateException();
+    public void setSources(Collection<ClusterNode> nodes) {
+        assert remainingRows == null;
+
+        remainingRows = new HashMap<>(nodes.size(), 1f);
+
+        for (ClusterNode node : nodes) {
+            if (remainingRows.put(node.id(), new Counter()) != null)
+                throw new IllegalStateException("Duplicate node id: " + node.id());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd5af493/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a74681e..3319797 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -111,6 +111,9 @@ public class GridReduceQueryExecutor {
     private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.4.0");
 
     /** */
+    private boolean oldNodesInTopology = true;
+
+    /** */
     private GridKernalContext ctx;
 
     /** */
@@ -400,7 +403,7 @@ public class GridReduceQueryExecutor {
      * @param topVer Topology version.
      * @param cctx Cache context for main space.
      * @param extraSpaces Extra spaces.
-     * @return Data nodes or {@code null} if repartitioning started and we need to retry..
+     * @return Data nodes or {@code null} if repartitioning started and we need to retry.
      */
     private Collection<ClusterNode> stableDataNodes(
         AffinityTopologyVersion topVer,
@@ -409,11 +412,13 @@ public class GridReduceQueryExecutor {
     ) {
         String space = cctx.name();
 
-        Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
+        Collection<ClusterNode> nodes = dataNodes(space, topVer);
 
         if (F.isEmpty(nodes))
             throw new CacheException("Failed to find data nodes for cache: " + space);
 
+        nodes = nodes.size() > 4 ? new HashSet<>(nodes) : new ArrayList<>(nodes);
+
         if (!F.isEmpty(extraSpaces)) {
             for (String extraSpace : extraSpaces) {
                 GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
@@ -469,21 +474,26 @@ public class GridReduceQueryExecutor {
      * @return {@code true} If there are old nodes in topology.
      */
     private boolean oldNodesInTopology() {
-        NavigableMap<IgniteProductVersion,Collection<ClusterNode>> m = ctx.discovery().topologyVersionMap();
-
-        if (F.isEmpty(m))
+        if (!oldNodesInTopology)
             return false;
 
-        for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
-            if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
-                return false;
+        NavigableMap<IgniteProductVersion,Collection<ClusterNode>> m = ctx.discovery().topologyVersionMap();
 
-            for (ClusterNode node : entry.getValue()) {
-                if (!node.isClient() && !node.isDaemon())
-                    return true;
+        if (!F.isEmpty(m)) {
+            for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
+                if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
+                    break;
+
+                for (ClusterNode node : entry.getValue()) {
+                    if (!node.isClient() && !node.isDaemon())
+                        return true;
+                }
             }
         }
 
+        // If we did not find old nodes, we assume that old node will not join further.
+        oldNodesInTopology = false;
+
         return false;
     }
 
@@ -516,7 +526,9 @@ public class GridReduceQueryExecutor {
 
             final String space = cctx.name();
 
-            final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize());
+            List<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+
+            final QueryRun r = new QueryRun(h2.connectionForSpace(space), mapQrys.size(), qry.pageSize());
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
@@ -552,18 +564,16 @@ public class GridReduceQueryExecutor {
                 nodes = Collections.singleton(F.rand(nodes));
             }
 
-            int tblIdx = 0;
-
-            final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
+            final boolean skipMergeTbl = qry.skipMergeTable() && !qry.explain();
 
-            for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+            for (int i = 0; i < mapQrys.size(); i++) {
                 GridMergeIndex idx;
 
-                if (!skipMergeTbl || qry.explain()) {
+                if (!skipMergeTbl) {
                     GridMergeTable tbl;
 
                     try {
-                        tbl = createMergeTable(r.conn, mapQry, qry.explain());
+                        tbl = createMergeTable(r.conn, mapQrys.get(i), qry.explain());
                     }
                     catch (IgniteCheckedException e) {
                         throw new IgniteException(e);
@@ -571,18 +581,17 @@ public class GridReduceQueryExecutor {
 
                     idx = tbl.getScanIndex(null);
 
-                    fakeTable(r.conn, tblIdx++).innerTable(tbl);
+                    fakeTable(r.conn, i).innerTable(tbl);
                 }
                 else
                     idx = GridMergeIndexUnsorted.createDummy();
 
-                for (ClusterNode node : nodes)
-                    idx.addSource(node.id());
+                idx.setSources(nodes);
 
                 r.idxs.add(idx);
             }
 
-            r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+            r.latch = new CountDownLatch(mapQrys.size() * nodes.size());
 
             runs.put(qryReqId, r);
 
@@ -593,8 +602,6 @@ public class GridReduceQueryExecutor {
                             "Client node disconnected."));
                 }
 
-                List<GridCacheSqlQuery> mapQrys = qry.mapQueries();
-
                 if (qry.explain()) {
                     mapQrys = new ArrayList<>(qry.mapQueries().size());
 
@@ -652,13 +659,13 @@ public class GridReduceQueryExecutor {
                 Iterator<List<?>> resIter = null;
 
                 if (!retry) {
-                    if (skipMergeTbl && !qry.explain()) {
-                        List<List<?>> res = new ArrayList<>();
-
+                    if (skipMergeTbl) {
                         assert r.idxs.size() == 1 : r.idxs;
 
                         GridMergeIndex idx = r.idxs.get(0);
 
+                        List<List<?>> res = new ArrayList<>((int)idx.getRowCountApproximation());
+
                         Cursor cur = idx.findInStream(null, null);
 
                         while (cur.next()) {
@@ -747,7 +754,7 @@ public class GridReduceQueryExecutor {
                     U.warn(log, "Query run was already removed: " + qryReqId);
 
                 if (!skipMergeTbl) {
-                    for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+                    for (int i = 0; i < mapQrys.size(); i++)
                         fakeTable(null, i).innerTable(null); // Drop all merge tables.
                 }
             }