You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/24 10:01:33 UTC

[GitHub] [ignite] timoninmaxim commented on a change in pull request #9081: IGNITE-14703. Add MergeSort distributed cache query reducer.

timoninmaxim commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r637832326



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -169,103 +97,30 @@ protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) {
+        return reducer.onPage(nodeId, last);
     }
 
     /** {@inheritDoc} */
     @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        reducer.loadAll();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override protected Reducer<R> reducer() {
+        return reducer;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         return super.onCancelled();
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         super.onTimeout();
     }

Review comment:
       @AMashenkov hi! Thanks for have a look this PR. Agree, I'll move it to the base class (`GridCacheQueryFutureAdapter`), there are additional functionality on this callbacks methods (onCancelled, onTimeout)
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org