You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/06/18 10:44:37 UTC
[38/50] incubator-ignite git commit: Merge branches 'ignite-484-1'
and 'ignite-sprint-6' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
Merge branches 'ignite-484-1' and 'ignite-sprint-6' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-484-1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/efb42447
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/efb42447
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/efb42447
Branch: refs/heads/ignite-sprint-7
Commit: efb4244779b94c9c9f35c63708e4a41da2430bce
Parents: 94060c9 4298238 af829d0
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 17 19:50:12 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 17 19:50:12 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +
.../processors/cache/IgniteCacheProxy.java | 7 +
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../transactions/IgniteTxLocalAdapter.java | 12 +-
.../dr/IgniteDrDataStreamerCacheUpdater.java | 7 +-
.../CacheStoreUsageMultinodeAbstractTest.java | 305 +++++++++++++++++++
...eUsageMultinodeDynamicStartAbstractTest.java | 169 ++++++++++
...oreUsageMultinodeDynamicStartAtomicTest.java | 32 ++
...heStoreUsageMultinodeDynamicStartTxTest.java | 32 ++
...reUsageMultinodeStaticStartAbstractTest.java | 158 ++++++++++
...toreUsageMultinodeStaticStartAtomicTest.java | 32 ++
...cheStoreUsageMultinodeStaticStartTxTest.java | 32 ++
.../testsuites/IgniteCacheTestSuite4.java | 4 +
.../h2/twostep/GridReduceQueryExecutor.java | 3 +-
15 files changed, 793 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/efb42447/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 6635dde,6c407d9,11054b7..b956167
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@@@ -273,514 -273,477 -265,113 +273,515 @@@@ public class GridReduceQueryExecutor
}
/**
+ * @param r Query run.
+ * @param retryVer Retry version.
+ * @param nodeId Node ID.
+ */
+ private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
+ r.state(retryVer, nodeId);
+ }
+
+ /**
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return {@code true} If preloading is active.
+ */
+ private boolean isPreloadingActive(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+ if (hasMovingPartitions(cctx))
+ return true;
+
+ if (extraSpaces != null) {
+ for (String extraSpace : extraSpaces) {
+ if (hasMovingPartitions(cacheContext(extraSpace)))
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @return {@code true} If cache context
+ */
+ private boolean hasMovingPartitions(GridCacheContext<?,?> cctx) {
+ GridDhtPartitionFullMap fullMap = cctx.topology().partitionMap(false);
+
+ for (GridDhtPartitionMap map : fullMap.values()) {
+ if (map.hasMovingPartitions())
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param name Cache name.
+ * @return Cache context.
+ */
+ private GridCacheContext<?,?> cacheContext(String name) {
+ return ctx.cache().internalCache(name).context();
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Data nodes or {@code null} if repartitioning started and we need to retry..
+ */
+ private Collection<ClusterNode> stableDataNodes(
+ AffinityTopologyVersion topVer,
+ final GridCacheContext<?,?> cctx,
+ List<String> extraSpaces
+ ) {
+ String space = cctx.name();
+
- Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
++ Set<ClusterNode> nodes = new HashSet<>(dataNodes(space, topVer));
+
+ if (F.isEmpty(nodes))
+ throw new CacheException("No data nodes found for cache: " + space);
+
+ if (!F.isEmpty(extraSpaces)) {
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+ if (extraCctx.isLocal())
+ continue; // No consistency guaranties for local caches.
+
+ if (cctx.isReplicated() && !extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with partitioned tables.");
+
- Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
++ Collection<ClusterNode> extraNodes = dataNodes(extraSpace, topVer);
+
+ if (F.isEmpty(extraNodes))
+ throw new CacheException("No data nodes found for cache: " + extraSpace);
+
+ if (cctx.isReplicated() && extraCctx.isReplicated()) {
+ nodes.retainAll(extraNodes);
+
+ if (nodes.isEmpty()) {
+ if (isPreloadingActive(cctx, extraSpaces))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ }
+ else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
+ if (!extraNodes.containsAll(nodes))
+ if (isPreloadingActive(cctx, extraSpaces))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
+ if (extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes))
+ if (isPreloadingActive(cctx, extraSpaces))
+ return null; // Retry.
+ else
+ throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+ "' have distinct set of data nodes.");
+ }
+ else
+ throw new IllegalStateException();
+ }
+ }
+
+ return nodes;
+ }
+
+ /**
* @param cctx Cache context.
* @param qry Query.
+ * @param keepPortable Keep portable.
* @return Cursor.
*/
public Iterator<List<?>> query(GridCacheContext<?,?> cctx, GridCacheTwoStepQuery qry, boolean keepPortable) {
- for (;;) {
- long qryReqId = reqIdGen.incrementAndGet();
++ for (int attempt = 0;; attempt++) {
++ if (attempt != 0) {
++ try {
++ Thread.sleep(attempt * 10); // Wait for exchange.
++ }
++ catch (InterruptedException e) {
++ Thread.currentThread().interrupt();
+
- QueryRun r = new QueryRun();
++ throw new CacheException("Query was interrupted.", e);
++ }
++ }
+
- r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
+ long qryReqId = reqIdGen.incrementAndGet();
- r.tbls = new ArrayList<>(qry.mapQueries().size());
+ QueryRun r = new QueryRun();
- String space = cctx.name();
+ r.pageSize = qry.pageSize() <= 0 ? GridCacheTwoStepQuery.DFLT_PAGE_SIZE : qry.pageSize();
- r.conn = (JdbcConnection)h2.connectionForSpace(space);
+ r.tbls = new ArrayList<>(qry.mapQueries().size());
- // TODO Add topology version.
- ClusterGroup dataNodes = ctx.grid().cluster().forDataNodes(space);
+ String space = cctx.name();
- if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || dataNodes.node(ctx.localNodeId()) == null : "We must be on a client node.";
+ r.conn = (JdbcConnection)h2.connectionForSpace(space);
- // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
- dataNodes = dataNodes.forRandom();
- }
+ AffinityTopologyVersion topVer = h2.readyTopologyVersion();
- final Collection<ClusterNode> nodes = dataNodes.nodes();
+ List<String> extraSpaces = extraSpaces(space, qry.spaces());
- for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
- GridMergeTable tbl;
+ Collection<ClusterNode> nodes;
+
+ // Explicit partition mapping for unstable topology.
+ Map<ClusterNode, IntArray> partsMap = null;
+
+ if (isPreloadingActive(cctx, extraSpaces)) {
+ if (cctx.isReplicated())
- nodes = replicatedDataNodes(cctx, extraSpaces);
++ nodes = replicatedUnstableDataNodes(cctx, extraSpaces);
+ else {
- partsMap = partitionLocations(cctx, extraSpaces);
++ partsMap = partitionedUnstableDataNodes(cctx, extraSpaces);
+
+ nodes = partsMap == null ? null : partsMap.keySet();
+ }
+ }
+ else
+ nodes = stableDataNodes(topVer, cctx, extraSpaces);
+
+ if (nodes == null)
+ continue; // Retry.
+
+ assert !nodes.isEmpty();
+
+ if (cctx.isReplicated() || qry.explain()) {
- assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
+ + assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) :
+ + "We must be on a client node.";
+
+ // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
+ nodes = Collections.singleton(F.rand(nodes));
+ }
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
+ GridMergeTable tbl;
+
+ try {
+ tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+
+ GridMergeIndex idx = tbl.getScanIndex(null);
+
+ for (ClusterNode node : nodes)
+ idx.addSource(node.id());
+
+ r.tbls.add(tbl);
+
+ curFunTbl.set(tbl);
+ }
+
+ r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+
+ runs.put(qryReqId, r);
try {
- tbl = createFunctionTable(r.conn, mapQry, qry.explain()); // createTable(r.conn, mapQry); TODO
+ Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
+
+ if (qry.explain()) {
+ mapQrys = new ArrayList<>(qry.mapQueries().size());
+
+ for (GridCacheSqlQuery mapQry : qry.mapQueries())
+ mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+ }
+
+ if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
+ Marshaller m = ctx.config().getMarshaller();
+
+ for (GridCacheSqlQuery mapQry : mapQrys)
+ mapQry.marshallParams(m);
+ }
+
+ boolean retry = false;
+
+ if (send(nodes,
+ new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
+ U.await(r.latch);
+
+ Object state = r.state.get();
+
+ if (state != null) {
+ if (state instanceof CacheException)
+ throw new CacheException("Failed to run map query remotely.", (CacheException)state);
+
+ if (state instanceof AffinityTopologyVersion) {
+ retry = true;
+
+ // If remote node asks us to retry then we have outdated full partition map.
+ h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
+ }
+ }
+ }
+ else // Send failed.
+ retry = true;
+
+ ResultSet res = null;
+
+ if (!retry) {
+ if (qry.explain())
+ return explainPlan(r.conn, space, qry);
+
+ GridCacheSqlQuery rdc = qry.reduceQuery();
+
+ res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+ }
+
+ for (GridMergeTable tbl : r.tbls) {
+ if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
+ send(nodes, new GridQueryCancelRequest(qryReqId), null);
+
+// dropTable(r.conn, tbl.getName()); TODO
+ }
+
+ if (retry) {
+ if (Thread.currentThread().isInterrupted())
+ throw new IgniteInterruptedCheckedException("Query was interrupted.");
+
+ continue;
+ }
+
+ return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
+ catch (IgniteCheckedException | RuntimeException e) {
+ U.closeQuiet(r.conn);
+
- if (e instanceof CacheException)
- throw (CacheException)e;
-
+ throw new CacheException("Failed to run reduce query locally.", e);
+ }
+ finally {
+ if (!runs.remove(qryReqId, r))
+ U.warn(log, "Query run was already removed: " + qryReqId);
+
+ curFunTbl.remove();
}
+ }
+ }
+
+ /**
+ * Calculates data nodes for replicated caches on unstable topology.
+ *
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Collection of all data nodes owning all the caches or {@code null} for retry.
+ */
- private Collection<ClusterNode> replicatedDataNodes(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
++ private Collection<ClusterNode> replicatedUnstableDataNodes(final GridCacheContext<?,?> cctx,
++ List<String> extraSpaces) {
+ assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
- Set<ClusterNode> nodes = owningReplicatedDataNodes(cctx);
++ Set<ClusterNode> nodes = replicatedUnstableDataNodes(cctx);
+
- GridMergeIndex idx = tbl.getScanIndex(null);
++ if (F.isEmpty(nodes))
++ return null; // Retry.
- for (ClusterNode node : nodes)
- idx.addSource(node.id());
+ if (!F.isEmpty(extraSpaces)) {
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- r.tbls.add(tbl);
+ if (extraCctx.isLocal())
+ continue;
- curFunTbl.set(tbl);
+ if (!extraCctx.isReplicated())
+ throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+ "with partitioned tables.");
+
- nodes.retainAll(owningReplicatedDataNodes(extraCctx));
++ Set<ClusterNode> extraOwners = replicatedUnstableDataNodes(extraCctx);
++
++ if (F.isEmpty(extraOwners))
++ return null; // Retry.
++
++ nodes.retainAll(extraOwners);
+
+ if (nodes.isEmpty())
+ return null; // Retry.
+ }
}
- r.latch = new CountDownLatch(r.tbls.size() * nodes.size());
+ return nodes;
+ }
+
+ /**
++ * @param space Cache name.
++ * @param topVer Topology version.
++ * @return Collection of data nodes.
++ */
++ private Collection<ClusterNode> dataNodes(String space, AffinityTopologyVersion topVer) {
++ Collection<ClusterNode> res = ctx.discovery().cacheAffinityNodes(space, topVer);
+
- runs.put(qryReqId, r);
++ return res != null ? res : Collections.<ClusterNode>emptySet();
++ }
+
- try {
- Collection<GridCacheSqlQuery> mapQrys = qry.mapQueries();
++ /**
+ * Collects all the nodes owning all the partitions for the given replicated cache.
+ *
+ * @param cctx Cache context.
- * @return Owning nodes.
++ * @return Owning nodes or {@code null} if we can't find owners for some partitions.
+ */
- private Set<ClusterNode> owningReplicatedDataNodes(GridCacheContext<?,?> cctx) {
++ private Set<ClusterNode> replicatedUnstableDataNodes(GridCacheContext<?,?> cctx) {
+ assert cctx.isReplicated() : cctx.name() + " must be replicated";
+
+ String space = cctx.name();
+
- Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, NONE));
++ Set<ClusterNode> dataNodes = new HashSet<>(dataNodes(space, NONE));
+
+ if (dataNodes.isEmpty())
+ throw new CacheException("No data nodes found for cache '" + space + "'");
+
+ // Find all the nodes owning all the partitions for replicated cache.
- for (int p = 0, extraParts = cctx.affinity().partitions(); p < extraParts; p++) {
++ for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
+ List<ClusterNode> owners = cctx.topology().owners(p);
+
- if (owners.isEmpty())
- throw new CacheException("No data nodes found for cache '" + space +
- "' for partition " + p);
++ if (F.isEmpty(owners))
++ return null; // Retry.
+
+ dataNodes.retainAll(owners);
+
+ if (dataNodes.isEmpty())
- throw new CacheException("No data nodes found for cache '" + space +
- "' owning all the partitions.");
++ return null; // Retry.
+ }
+
+ return dataNodes;
+ }
+
+ /**
+ * Calculates partition mapping for partitioned cache on unstable topology.
+ *
+ * @param cctx Cache context for main space.
+ * @param extraSpaces Extra spaces.
+ * @return Partition mapping or {@code null} if we can't calculate it due to repartitioning and we need to retry.
+ */
+ @SuppressWarnings("unchecked")
- private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
++ private Map<ClusterNode, IntArray> partitionedUnstableDataNodes(final GridCacheContext<?,?> cctx,
++ List<String> extraSpaces) {
+ assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
+
+ final int partsCnt = cctx.affinity().partitions();
+
+ if (extraSpaces != null) { // Check correct number of partitions for partitioned caches.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- if (qry.explain()) {
- mapQrys = new ArrayList<>(qry.mapQueries().size());
+ if (extraCctx.isReplicated() || extraCctx.isLocal())
+ continue;
- for (GridCacheSqlQuery mapQry : qry.mapQueries())
- mapQrys.add(new GridCacheSqlQuery(mapQry.alias(), "EXPLAIN " + mapQry.query(), mapQry.parameters()));
+ int parts = extraCctx.affinity().partitions();
+
+ if (parts != partsCnt)
+ throw new CacheException("Number of partitions must be the same for correct collocation in " +
+ "caches " + cctx.name() + " and " + extraSpace + ".");
}
+ }
+
+ Set<ClusterNode>[] partLocs = new Set[partsCnt];
- if (nodes.size() != 1 || !F.first(nodes).isLocal()) { // Marshall params for remotes.
- Marshaller m = ctx.config().getMarshaller();
+ // Fill partition locations for main cache.
+ for (int p = 0, parts = cctx.affinity().partitions(); p < parts; p++) {
+ List<ClusterNode> owners = cctx.topology().owners(p);
- if (F.isEmpty(owners))
- for (GridCacheSqlQuery mapQry : mapQrys)
- mapQry.marshallParams(m);
++ if (F.isEmpty(owners)) {
++ if (!F.isEmpty(dataNodes(cctx.name(), NONE)))
++ return null; // Retry.
++
+ throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
+ }
- send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys));
+ partLocs[p] = new HashSet<>(owners);
+ }
- r.latch.await();
+ if (extraSpaces != null) {
+ // Find owner intersections for each participating partitioned cache partition.
+ // We need this for logical collocation between different partitioned caches with the same affinity.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
- if (r.rmtErr != null)
- throw new CacheException("Failed to run map query remotely.", r.rmtErr);
+ if (extraCctx.isReplicated() || extraCctx.isLocal())
+ continue;
- if (qry.explain())
- return explainPlan(r.conn, space, qry);
+ for (int p = 0, parts = extraCctx.affinity().partitions(); p < parts; p++) {
+ List<ClusterNode> owners = extraCctx.topology().owners(p);
- if (F.isEmpty(owners))
- GridCacheSqlQuery rdc = qry.reduceQuery();
++ if (F.isEmpty(owners)) {
++ if (!F.isEmpty(dataNodes(extraSpace, NONE)))
++ return null; // Retry.
+
- final ResultSet res = h2.executeSqlQueryWithTimer(space, r.conn, rdc.query(), F.asList(rdc.parameters()));
+ throw new CacheException("No data nodes found for cache '" + extraSpace +
+ "' for partition " + p);
++ }
- for (GridMergeTable tbl : r.tbls) {
- if (!tbl.getScanIndex(null).fetchedAll()) // We have to explicitly cancel queries on remote nodes.
- send(nodes, new GridQueryCancelRequest(qryReqId));
+ if (partLocs[p] == null)
+ partLocs[p] = new HashSet<>(owners);
+ else {
+ partLocs[p].retainAll(owners); // Intersection of owners.
-// dropTable(r.conn, tbl.getName()); TODO
+ if (partLocs[p].isEmpty())
+ return null; // Intersection is empty -> retry.
+ }
+ }
}
- return new GridQueryCacheObjectsIterator(new Iter(res), cctx, keepPortable);
+ // Filter nodes where not all the replicated caches loaded.
+ for (String extraSpace : extraSpaces) {
+ GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+ if (!extraCctx.isReplicated())
+ continue;
+
- Set<ClusterNode> dataNodes = owningReplicatedDataNodes(extraCctx);
++ Set<ClusterNode> dataNodes = replicatedUnstableDataNodes(extraCctx);
++
++ if (F.isEmpty(dataNodes))
++ return null; // Retry.
+
+ for (Set<ClusterNode> partLoc : partLocs) {
+ partLoc.retainAll(dataNodes);
+
+ if (partLoc.isEmpty())
+ return null; // Retry.
+ }
+ }
}
- catch (IgniteCheckedException | InterruptedException | RuntimeException e) {
- U.closeQuiet(r.conn);
- if (e instanceof CacheException)
- throw (CacheException)e;
+ // Collect the final partitions mapping.
+ Map<ClusterNode, IntArray> res = new HashMap<>();
+
+ // Here partitions in all IntArray's will be sorted in ascending order, this is important.
+ for (int p = 0; p < partLocs.length; p++) {
+ Set<ClusterNode> pl = partLocs[p];
- throw new CacheException("Failed to run reduce query locally.", e);
+ assert !F.isEmpty(pl) : pl;
+
+ ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
+
+ IntArray parts = res.get(n);
+
+ if (parts == null)
+ res.put(n, parts = new IntArray());
+
+ parts.add(p);
}
- finally {
- if (!runs.remove(qryReqId, r))
- U.warn(log, "Query run was already removed: " + qryReqId);
- curFunTbl.remove();
+ return res;
+ }
+
+ /**
+ * @param mainSpace Main space.
+ * @param allSpaces All spaces.
+ * @return List of all extra spaces or {@code null} if none.
+ */
+ private List<String> extraSpaces(String mainSpace, Set<String> allSpaces) {
+ if (F.isEmpty(allSpaces) || (allSpaces.size() == 1 && allSpaces.contains(mainSpace)))
+ return null;
+
+ ArrayList<String> res = new ArrayList<>(allSpaces.size());
+
+ for (String space : allSpaces) {
+ if (!F.eq(space, mainSpace))
+ res.add(space);
}
+
+ return res;
}
/**