You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/06/03 20:04:26 UTC

[GitHub] [ignite] AMashenkov commented on a change in pull request #9081: IGNITE-14703. Add MergeSort distributed cache query reducer.

AMashenkov commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r633556166



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -169,103 +97,30 @@ protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean onPage(UUID nodeId, boolean last) {
-        assert Thread.holdsLock(this);
-
-        if (!loc) {
-            rcvd.add(nodeId);
-
-            if (rcvd.containsAll(subgrid))
-                firstPageLatch.countDown();
-        }
-
-        boolean futFinish;
-
-        if (last) {
-            futFinish = loc || (subgrid.remove(nodeId) && subgrid.isEmpty());
-
-            if (futFinish)
-                firstPageLatch.countDown();
-        }
-        else
-            futFinish = false;
-
-        return futFinish;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void loadPage() {
-        assert !Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && rcvd.containsAll(subgrid)) {
-                rcvd.clear();
-
-                nodes = nodes();
-            }
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, false);
+    @Override protected boolean onPage(@Nullable UUID nodeId, boolean last) {
+        return reducer.onPage(nodeId, last);
     }
 
     /** {@inheritDoc} */
     @Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
-        assert !Thread.holdsLock(this);
-
-        U.await(firstPageLatch);
-
-        Collection<ClusterNode> nodes = null;
-
-        synchronized (this) {
-            if (!isDone() && !subgrid.isEmpty())
-                nodes = nodes();
-        }
-
-        if (nodes != null)
-            cctx.queries().loadPage(reqId, qry.query(), nodes, true);
-    }
-
-    /**
-     * @return Nodes to send requests to.
-     */
-    private Collection<ClusterNode> nodes() {
-        assert Thread.holdsLock(this);
-
-        Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());
-
-        for (UUID nodeId : subgrid) {
-            ClusterNode node = cctx.discovery().node(nodeId);
-
-            if (node != null)
-                nodes.add(node);
-        }
-
-        return nodes;
+        reducer.loadAll();
     }
 
     /** {@inheritDoc} */
-    @Override public boolean onDone(Collection<R> res, Throwable err) {
-        boolean done = super.onDone(res, err);
-
-        // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
-        firstPageLatch.countDown();
-
-        return done;
+    @Override protected Reducer<R> reducer() {
+        return reducer;
     }
 
     /** {@inheritDoc} */
     @Override public boolean onCancelled() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         return super.onCancelled();
     }
 
     /** {@inheritDoc} */
     @Override public void onTimeout() {
-        firstPageLatch.countDown();
+        reducer.onLastPage();
 
         super.onTimeout();
     }

Review comment:
       Do we need all these callbacks in QueryFutureAdapter just to delegate calls to reducer
   and having reducer() at same time?
   I see a pattern you already use: reducer().onPage() and similar.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org