You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:14 UTC

[07/50] incubator-ignite git commit: ignite-484 - explicit partitions list

ignite-484 - explicit partitions list


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/20ec22b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/20ec22b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/20ec22b8

Branch: refs/heads/ignite-sprint-6
Commit: 20ec22b85cddf8dfb5c453758aa83dcb49fbbd32
Parents: a12aadf
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon May 25 03:20:54 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon May 25 03:20:54 2015 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryIndexing.java     |  5 ++-
 .../processors/query/GridQueryProcessor.java    |  4 +-
 .../h2/twostep/messages/GridQueryRequest.java   | 36 +++++++++++++--
 .../processors/query/h2/IgniteH2Indexing.java   | 43 ++++++++++++++++--
 .../query/h2/twostep/GridMapQueryExecutor.java  | 47 ++++++++++++++++++--
 .../h2/twostep/GridReduceQueryExecutor.java     | 33 +++++++++++++-
 6 files changed, 151 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 6b1401d..216773e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -222,8 +222,11 @@ public interface GridQueryIndexing {
     /**
      * Returns backup filter.
      *
+     * @param caches List of caches.
      * @param topVer Topology version.
+     * @param parts Partitions.
      * @return Backup filter.
      */
-    public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer);
+    public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer,
+        List<int[]> parts);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index afd0386..202ec75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -603,7 +603,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
                 throw new CacheException("Failed to find SQL table for type: " + type);
 
             final GridCloseableIterator<IgniteBiTuple<K,V>> i = idx.query(space, sqlQry, F.asList(params), typeDesc,
-                idx.backupFilter(null));
+                idx.backupFilter(null, null, null));
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(
@@ -670,7 +670,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
             String sql = qry.getSql();
             Object[] args = qry.getArgs();
 
-            GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null));
+            GridQueryFieldsResult res = idx.queryFields(space, sql, F.asList(args), idx.backupFilter(null, null, null));
 
             if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
                 ctx.event().record(new CacheQueryExecutedEvent<>(

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 2d53944..74b4392 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -54,7 +54,11 @@ public class GridQueryRequest implements Message {
 
     /** */
     @GridDirectCollection(String.class)
-    private Collection<String> extraSpaces;
+    private List<String> extraSpaces;
+
+    /** */
+    @GridDirectCollection(int[].class)
+    private List<int[]> parts;
 
     /**
      * Default constructor.
@@ -70,6 +74,7 @@ public class GridQueryRequest implements Message {
      * @param qrys Queries.
      * @param topVer Topology version.
      * @param extraSpaces All space names participating in query other than {@code space}.
+     * @param parts Optional partitions for unstable topology.
      */
     public GridQueryRequest(
         long reqId,
@@ -77,7 +82,8 @@ public class GridQueryRequest implements Message {
         String space,
         Collection<GridCacheSqlQuery> qrys,
         AffinityTopologyVersion topVer,
-        List<String> extraSpaces) {
+        List<String> extraSpaces,
+        List<int[]> parts) {
         this.reqId = reqId;
         this.pageSize = pageSize;
         this.space = space;
@@ -85,12 +91,20 @@ public class GridQueryRequest implements Message {
         this.qrys = qrys;
         this.topVer = topVer;
         this.extraSpaces = extraSpaces;
+        this.parts = parts;
+    }
+
+    /**
+     * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
+     */
+    public List<int[]> partitions() {
+        return parts;
     }
 
     /**
      * @return All extra space names participating in query other than {@link #space()}.
      */
-    public Collection<String> extraSpaces() {
+    public List<String> extraSpaces() {
         return extraSpaces;
     }
 
@@ -181,6 +195,12 @@ public class GridQueryRequest implements Message {
                     return false;
 
                 writer.incrementState();
+
+            case 6:
+                if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -241,6 +261,14 @@ public class GridQueryRequest implements Message {
                     return false;
 
                 reader.incrementState();
+
+            case 6:
+                parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return true;
@@ -253,6 +281,6 @@ public class GridQueryRequest implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 6;
+        return 7;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 1cfc314..0ee9876 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1354,21 +1354,56 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /** {@inheritDoc} */
-    @Override public IndexingQueryFilter backupFilter(AffinityTopologyVersion topVer) {
+    @Override public IndexingQueryFilter backupFilter(
+        @Nullable final List<String> caches,
+        @Nullable final AffinityTopologyVersion topVer,
+        @Nullable final List<int[]> parts
+    ) {
         final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
 
         return new IndexingQueryFilter() {
             @Nullable @Override public <K, V> IgniteBiPredicate<K, V> forSpace(String spaceName) {
                 final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(spaceName);
 
-                if (cache.context().isReplicated() || cache.configuration().getBackups() == 0)
+                if (cache.context().isReplicated() || (cache.configuration().getBackups() == 0 && parts == null))
                     return null;
 
+                final GridCacheAffinityManager aff = cache.context().affinity();
+
+                if (parts != null) {
+                    int idx = caches.indexOf(spaceName);
+
+                    final int[] parts0 = parts.get(idx);
+
+                    if (parts0.length < 64) {
+                        return new IgniteBiPredicate<K,V>() {
+                            @Override public boolean apply(K k, V v) {
+                                int p = aff.partition(k);
+
+                                for (int p0 : parts0) {
+                                    if (p0 == p)
+                                        return true;
+                                }
+
+                                return false;
+                            }
+                        };
+                    }
+
+                    return new IgniteBiPredicate<K,V>() {
+                        @Override public boolean apply(K k, V v) {
+                            int p = aff.partition(k);
+
+                            return Arrays.binarySearch(parts0, p) >= 0;
+                        }
+                    };
+                }
+
                 final ClusterNode locNode = ctx.discovery().localNode();
 
                 return new IgniteBiPredicate<K, V>() {
                     @Override public boolean apply(K k, V v) {
-                        return cache.context().affinity().primary(locNode, k, topVer0);
+                        return aff.primary(locNode, k, topVer0);
                     }
                 };
             }
@@ -1392,7 +1427,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @return Current topology version.
      */
     public AffinityTopologyVersion topologyVersion() {
-        return ctx.discovery().topologyVersionEx();
+        return ctx.cache().context().exchange().readyAffinityVersion();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 112949f..06bad76 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -217,16 +217,22 @@ public class GridMapQueryExecutor {
     /**
      * @param cacheNames Cache names.
      * @param topVer Topology version.
+     * @param parts Explicit partitions.
      * @param reserved Reserved list.
      * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reservePartitions(Collection<String> cacheNames,  AffinityTopologyVersion topVer,
+    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts,
         List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
+        assert parts == null || parts.size() == cacheNames.size();
+
+        int i = 0;
+
         for (String cacheName : cacheNames) {
             GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
 
-            Set<Integer> partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+            Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) :
+                cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
 
             for (int partId : partIds) {
                 GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
@@ -250,6 +256,37 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * @param ints Integers.
+     * @return Collection wrapper.
+     */
+    private static Collection<Integer> wrap(final int[] ints) {
+        return new AbstractCollection<Integer>() {
+            @Override public Iterator<Integer> iterator() {
+                return new Iterator<Integer>() {
+                    /** */
+                    private int i = 0;
+
+                    @Override public boolean hasNext() {
+                        return i < ints.length;
+                    }
+
+                    @Override public Integer next() {
+                        return ints[i++];
+                    }
+
+                    @Override public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+
+            @Override public int size() {
+                return ints.length;
+            }
+        };
+    }
+
+    /**
      * Executing queries locally.
      *
      * @param node Node.
@@ -280,6 +317,8 @@ public class GridMapQueryExecutor {
                 throw new CacheException(e);
             }
 
+            List<String> caches = (List<String>)F.concat(true, req.space(), req.extraSpaces());
+
             // Topology version can be null in rolling restart with previous version!
             final AffinityTopologyVersion topVer = req.topologyVersion();
 
@@ -288,7 +327,7 @@ public class GridMapQueryExecutor {
                 h2.awaitForCacheAffinity(topVer);
 
                 // Reserve primary partitions.
-                if (!reservePartitions(F.concat(true, req.space(), req.extraSpaces()), topVer, reserved)) {
+                if (!reservePartitions(caches, topVer, req.partitions(), reserved)) {
                     sendRetry(node, req.requestId());
 
                     return;
@@ -303,7 +342,7 @@ public class GridMapQueryExecutor {
             if (nodeRess.put(req.requestId(), qr) != null)
                 throw new IllegalStateException();
 
-            h2.setFilters(h2.backupFilter(topVer));
+            h2.setFilters(h2.backupFilter(caches, topVer, req.partitions()));
 
             // TODO Prepare snapshots for all the needed tables before the run.
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/20ec22b8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index eb6db88..0836a75 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -279,6 +279,21 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param set Set.
+     * @return Array.
+     */
+    private static int[] unbox(Set<Integer> set) {
+        int[] arr = new int[set.size()];
+
+        int i = 0;
+
+        for (int x : set)
+            arr[i++] = x;
+
+        return arr;
+    }
+
+    /**
      * @param cctx Cache context.
      * @param qry Query.
      * @return Cursor.
@@ -304,12 +319,27 @@ public class GridReduceQueryExecutor {
             if (F.isEmpty(nodes))
                 throw new CacheException("No data nodes found for cache: " + space);
 
+            List<String> extraSpaces = extraSpaces(space, qry.spaces());
+
+            List<int[]> parts = null;
+
             if (cctx.isReplicated() || qry.explain()) {
                 assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
 
                 // Select random data node to run query on a replicated data or get EXPLAIN PLAN from a single node.
                 nodes = Collections.singleton(F.rand(nodes));
             }
+            else if (ctx.cache().context().exchange().hasPendingExchange()) { // TODO isActive ??
+                parts = new ArrayList<>(extraSpaces == null ? 1 : extraSpaces.size() + 1);
+
+                parts.add(unbox(cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+
+                if (extraSpaces != null) {
+                    for (String extraSpace : extraSpaces)
+                        parts.add(unbox(ctx.cache().internalCache(extraSpace).context()
+                            .affinity().primaryPartitions(ctx.localNodeId(), topVer)));
+                }
+            }
 
             for (GridCacheSqlQuery mapQry : qry.mapQueries()) {
                 GridMergeTable tbl;
@@ -355,8 +385,7 @@ public class GridReduceQueryExecutor {
                 boolean ok = false;
 
                 try {
-                    send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer,
-                        extraSpaces(space, qry.spaces())));
+                    send(nodes, new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, parts));
 
                     ok = true;
                 }