You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:26 UTC
[42/50] [abbrv] ignite git commit: ignite-split2 - optimizations
ignite-split2 - optimizations
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd5af493
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd5af493
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd5af493
Branch: refs/heads/ignite-1232
Commit: bd5af493a84d373a81188af43f1fb3a6071bdfd7
Parents: e0c86cd
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Dec 29 16:22:08 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Dec 29 16:22:08 2015 +0300
----------------------------------------------------------------------
.../query/h2/twostep/GridMergeIndex.java | 24 +++++---
.../h2/twostep/GridReduceQueryExecutor.java | 63 +++++++++++---------
2 files changed, 52 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bd5af493/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 2f06ac3..d3537da 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -18,16 +18,19 @@
package org.apache.ignite.internal.processors.query.h2.twostep;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.CacheException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Cursor;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
@@ -40,7 +43,6 @@ import org.h2.result.SortOrder;
import org.h2.table.IndexColumn;
import org.h2.table.TableFilter;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_MERGE_TABLE_MAX_SIZE;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
@@ -56,7 +58,7 @@ public abstract class GridMergeIndex extends BaseIndex {
private final AtomicInteger expRowsCnt = new AtomicInteger(0);
/** Remaining rows per source node ID. */
- private final ConcurrentMap<UUID, Counter> remainingRows = new ConcurrentHashMap8<>();
+ private Map<UUID, Counter> remainingRows;
/** */
private final AtomicBoolean lastSubmitted = new AtomicBoolean();
@@ -112,11 +114,19 @@ public abstract class GridMergeIndex extends BaseIndex {
}
/**
- * @param nodeId Node ID.
+ * Set source nodes.
+ *
+ * @param nodes Nodes.
*/
- public void addSource(UUID nodeId) {
- if (remainingRows.put(nodeId, new Counter()) != null)
- throw new IllegalStateException();
+ public void setSources(Collection<ClusterNode> nodes) {
+ assert remainingRows == null;
+
+ remainingRows = new HashMap<>(nodes.size(), 1f);
+
+ for (ClusterNode node : nodes) {
+ if (remainingRows.put(node.id(), new Counter()) != null)
+ throw new IllegalStateException("Duplicate node id: " + node.id());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/bd5af493/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a74681e..3319797 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -111,6 +111,9 @@ public class GridReduceQueryExecutor {
private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.4.0");
/** */
+ private boolean oldNodesInTopology = true;
+
+ /** */
private GridKernalContext ctx;
/** */
@@ -400,7 +403,7 @@ public class GridReduceQueryExecutor {
* @param topVer Topology version.
* @param cctx Cache context for main space.
* @param extraSpaces Extra spaces.
- * @return Data nodes or {@code null} if repartitioning started and we need to retry..
+ * @return Data nodes or {@code null} if repartitioning started and we need to retry.
*/
private Collection<ClusterNode> stableDataNodes(
AffinityTopologyVersion topVer,
@@ -409,11 +412,13 @@ public class GridReduceQueryExecutor {
) {
String space = cctx.name();
- Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
+ Collection<ClusterNode> nodes = dataNodes(space, topVer);
if (F.isEmpty(nodes))
throw new CacheException("Failed to find data nodes for cache: " + space);
+ nodes = nodes.size() > 4 ? new HashSet<>(nodes) : new ArrayList<>(nodes);
+
if (!F.isEmpty(extraSpaces)) {
for (String extraSpace : extraSpaces) {
GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
@@ -469,21 +474,26 @@ public class GridReduceQueryExecutor {
* @return {@code true} If there are old nodes in topology.
*/
private boolean oldNodesInTopology() {
- NavigableMap<IgniteProductVersion,Collection<ClusterNode>> m = ctx.discovery().topologyVersionMap();
-
- if (F.isEmpty(m))
+ if (!oldNodesInTopology)
return false;
- for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
- if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
- return false;
+ NavigableMap<IgniteProductVersion,Collection<ClusterNode>> m = ctx.discovery().topologyVersionMap();
- for (ClusterNode node : entry.getValue()) {
- if (!node.isClient() && !node.isDaemon())
- return true;
+ if (!F.isEmpty(m)) {
+ for (Map.Entry<IgniteProductVersion,Collection<ClusterNode>> entry : m.entrySet()) {
+ if (entry.getKey().compareTo(DISTRIBUTED_JOIN_SINCE) >= 0)
+ break;
+
+ for (ClusterNode node : entry.getValue()) {
+ if (!node.isClient() && !node.isDaemon())
+ return true;
+ }
}
}
+ // If we did not find old nodes, we assume that old node will not join further.
+ oldNodesInTopology = false;
+
return false;
}
@@ -516,7 +526,9 @@ public class GridReduceQueryExecutor {
final String space = cctx.name();
- final QueryRun r = new QueryRun(h2.connectionForSpace(space), qry.mapQueries().size(), qry.pageSize());
+ List<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+
+ final QueryRun r = new QueryRun(h2.connectionForSpace(space), mapQrys.size(), qry.pageSize());
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -552,18 +564,16 @@ public class GridReduceQueryExecutor {
nodes = Collections.singleton(F.rand(nodes));
}
- int tblIdx = 0;
-
- final boolean skipMergeTbl = !qry.explain() && qry.skipMergeTable();
+ final boolean skipMergeTbl = qry.skipMergeTable() && !qry.explain();
- for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ for (int i = 0; i < mapQrys.size(); i++) {
GridMergeIndex idx;
- if (!skipMergeTbl || qry.explain()) {
+ if (!skipMergeTbl) {
GridMergeTable tbl;
try {
- tbl = createMergeTable(r.conn, mapQry, qry.explain());
+ tbl = createMergeTable(r.conn, mapQrys.get(i), qry.explain());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -571,18 +581,17 @@ public class GridReduceQueryExecutor {
idx = tbl.getScanIndex(null);
- fakeTable(r.conn, tblIdx++).innerTable(tbl);
+ fakeTable(r.conn, i).innerTable(tbl);
}
else
idx = GridMergeIndexUnsorted.createDummy();
- for (ClusterNode node : nodes)
- idx.addSource(node.id());
+ idx.setSources(nodes);
r.idxs.add(idx);
}
- r.latch = new CountDownLatch(r.idxs.size() * nodes.size());
+ r.latch = new CountDownLatch(mapQrys.size() * nodes.size());
runs.put(qryReqId, r);
@@ -593,8 +602,6 @@ public class GridReduceQueryExecutor {
"Client node disconnected."));
}
- List<GridCacheSqlQuery> mapQrys = qry.mapQueries();
-
if (qry.explain()) {
mapQrys = new ArrayList<>(qry.mapQueries().size());
@@ -652,13 +659,13 @@ public class GridReduceQueryExecutor {
Iterator<List<?>> resIter = null;
if (!retry) {
- if (skipMergeTbl && !qry.explain()) {
- List<List<?>> res = new ArrayList<>();
-
+ if (skipMergeTbl) {
assert r.idxs.size() == 1 : r.idxs;
GridMergeIndex idx = r.idxs.get(0);
+ List<List<?>> res = new ArrayList<>((int)idx.getRowCountApproximation());
+
Cursor cur = idx.findInStream(null, null);
while (cur.next()) {
@@ -747,7 +754,7 @@ public class GridReduceQueryExecutor {
U.warn(log, "Query run was already removed: " + qryReqId);
if (!skipMergeTbl) {
- for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++)
+ for (int i = 0; i < mapQrys.size(); i++)
fakeTable(null, i).innerTable(null); // Drop all merge tables.
}
}