You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/09/28 20:38:10 UTC

[GitHub] [ignite] Mmuzaf commented on a change in pull request #9081: IGNITE-14703. Add MergeSort distributed cache query reducer.

Mmuzaf commented on a change in pull request #9081:
URL: https://github.com/apache/ignite/pull/9081#discussion_r717459702



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
##########
@@ -69,205 +53,66 @@ protected GridCacheDistributedQueryFuture(GridCacheContext<K, V> ctx, long reqId
         GridCacheQueryManager<K, V> mgr = ctx.queries();
 
         assert mgr != null;
-
-        synchronized (this) {
-            for (ClusterNode node : nodes)
-                subgrid.add(node.id());
-        }
     }
 
     /** {@inheritDoc} */
-    @Override protected void cancelQuery() throws IgniteCheckedException {
-        final GridCacheQueryManager<K, V> qryMgr = cctx.queries();
-
-        assert qryMgr != null;
-
-        try {
-            Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
-            Collection<ClusterNode> nodes;
-
-            synchronized (this) {
-                nodes = F.retain(allNodes, true,
-                    new P1<ClusterNode>() {
-                        @Override public boolean apply(ClusterNode node) {
-                            return !cctx.localNodeId().equals(node.id()) && subgrid.contains(node.id());
-                        }
-                    }
-                );
-
-                subgrid.clear();
-            }
-
-            final GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
-                reqId,
-                fields(),
-                qryMgr.queryTopologyVersion(),
-                cctx.deploymentEnabled());
-
-            // Process cancel query directly (without sending) for local node,
-            cctx.closures().callLocalSafe(new GridPlainCallable<Object>() {
-                @Override public Object call() {
-                    qryMgr.processQueryRequest(cctx.localNodeId(), req);
+    @Override protected void cancelQuery() {
+        reducer.cancel();
 
-                    return null;
-                }
-            });
-
-            if (!nodes.isEmpty()) {
-                for (ClusterNode node : nodes) {
-                    try {
-                        cctx.io().send(node, req, cctx.ioPolicy());
-                    }
-                    catch (IgniteCheckedException e) {
-                        if (cctx.io().checkNodeLeft(node.id(), e, false)) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to send cancel request, node failed: " + node);
-                        }
-                        else
-                            U.error(log, "Failed to send cancel request [node=" + node + ']', e);
-                    }
-                }
-            }
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to send cancel request (will cancel query in any case).", e);
-        }
-
-        qryMgr.onQueryFutureCanceled(reqId);
+        cctx.queries().onQueryFutureCanceled(reqId);
 
         clear();
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
     @Override protected void onNodeLeft(UUID nodeId) {
-        boolean callOnPage;
-
-        synchronized (this) {
-            callOnPage = !loc && subgrid.contains(nodeId);
-        }
+        boolean qryNode = reducer.remoteQueryNode(nodeId);
 
-        if (callOnPage)
-            onPage(nodeId, Collections.emptyList(),
+        if (qryNode)

Review comment:
       Missing braces.
   
   https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines#CodingGuidelines-BracesandIndentation

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/AbstractDistributedCacheQueryReducer.java
##########
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryPageRequester;
+import org.apache.ignite.internal.processors.cache.query.DistributedCacheQueryReducer;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryFutureAdapter;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Abstract class for distributed reducer implementations.
+ */
+abstract class AbstractDistributedCacheQueryReducer<R> implements DistributedCacheQueryReducer<R> {
+    /** Collection of streams that don't deliver all pages yet. */
+    private final Map<UUID, NodePageStream<R>> remoteStreams;

Review comment:
       Maxim, I agree with Andrey. Your solution will work, however, I think it's better to merge these collections for code simplicity. We can easily filter the closed streams by some kind of `close` flag (since you name it as a `streams` probably they should implement a `Closable` interface too).

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
##########
@@ -776,135 +721,24 @@ private Object convert(Object obj) {
     }
 
     /** {@inheritDoc} */
-    @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry,
-        Collection<ClusterNode> nodes) {
-        assert cctx.config().getCacheMode() != LOCAL;
-
-        if (log.isDebugEnabled())
-            log.debug("Executing distributed query: " + qry);
-
-        long reqId = cctx.io().nextIoId();
-
-        final GridCacheDistributedFieldsQueryFuture fut =
-            new GridCacheDistributedFieldsQueryFuture(cctx, reqId, qry, nodes);
-
-        try {
-            qry.query().validate();
-
-            GridCacheQueryRequest req = new GridCacheQueryRequest(
-                cctx.cacheId(),
-                reqId,
-                cctx.name(),
-                qry.query().type(),
-                true,
-                qry.query().clause(),
-                null,
-                qry.query().limit(),
-                null,
-                null,
-                null,
-                qry.reducer(),
-                qry.transform(),
-                qry.query().pageSize(),
-                qry.query().includeBackups(),
-                qry.arguments(),
-                qry.query().includeMetadata(),
-                qry.query().keepBinary(),
-                qry.query().taskHash(),
-                queryTopologyVersion(),
-                null,
-                cctx.deploymentEnabled(),
-                qry.query().isDataPageScanEnabled());
-
-            addQueryFuture(req.id(), fut);
-
-            final Object topic = topic(cctx.nodeId(), req.id());
-
-            cctx.io().addOrderedCacheHandler(cctx.shared(), topic, resHnd);
-
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
-                    cctx.io().removeOrderedHandler(false, topic);
-                }
-            });
-
-            sendRequest(fut, req, nodes);
-        }
-        catch (IgniteCheckedException e) {
-            fut.onDone(e);
-        }
-
-        return fut;
+    @Override public CacheQueryFuture<?> queryFieldsDistributed(GridCacheQueryBean qry, Collection<ClusterNode> nodes) {
+        return queryDistributed(qry, nodes, true);
     }
 
-    /**
-     * Sends query request.
-     *
-     * @param fut Distributed future.
-     * @param req Request.
-     * @param nodes Nodes.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sendRequest(
-        final GridCacheDistributedQueryFuture<?, ?, ?> fut,
-        final GridCacheQueryRequest req,
-        Collection<ClusterNode> nodes
-    ) throws IgniteCheckedException {
-        assert fut != null;
-        assert req != null;
-        assert nodes != null;
-
-        final UUID locNodeId = cctx.localNodeId();
-
-        ClusterNode locNode = null;
-
-        Collection<ClusterNode> rmtNodes = null;
+    /** Creates a reducer depends on query type. */
+    private DistributedCacheQueryReducer createReducer(GridCacheQueryType qryType, long reqId,

Review comment:
       Can you fix the raw type usage?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/reducer/NodePageStream.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.reducer;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class provides an interface {@link #nextPage()} that returns a {@link NodePage} of cache query result
+ * from single node. Pages are stored in a queue, after polling a queue it tries to load a new page.
+ */
+class NodePageStream<R> {
+    /** Queue of data of results pages. */
+    private final Queue<NodePage<R>> queue = new LinkedList<>();
+
+    /** Node ID to stream pages */
+    private final UUID nodeId;
+
+    /**
+     * List of nodes that respons with cache query result pages. This collection should be cleaned before sending new

Review comment:
       It seems the JavaDoc doesn't correspond to the class param and should be fixed.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPageRequester.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class is responsible for sending request for query result pages to remote nodes.
+ */
+public class CacheQueryPageRequester {

Review comment:
       Let's move all the methods of this class to the `GridCacheDistributedQueryManager`? It seems there is no reason to share them through the dedicated class.

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/CacheQueryPageRequester.java
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This class is responsible for sending request for query result pages to remote nodes.
+ */
+public class CacheQueryPageRequester {
+    /** Cache context. */
+    private final GridCacheContext<?, ?> cctx;
+
+    /** Ignite logger. */
+    private final IgniteLogger log;
+
+    /** Local handler of cache query request. */
+    private final Consumer<GridCacheQueryRequest> sendLoc;
+
+    /**
+     * @param cctx Cache context.
+     * @param sendLoc Local handler of cache query request.
+     */
+    CacheQueryPageRequester(GridCacheContext<?, ?> cctx, Consumer<GridCacheQueryRequest> sendLoc) {
+        this.cctx = cctx;
+        this.sendLoc = sendLoc;
+
+        log = cctx.kernalContext().config().getGridLogger();
+    }
+
+    /**
+     * Send initial query request to specified nodes.
+     *
+     * @param reqId Request (cache query) ID.
+     * @param fut Cache query future, contains query info.
+     * @param nodes Collection of nodes to send a request.
+     */
+    public void initRequestPages(long reqId, GridCacheDistributedQueryFuture<?, ?, ?> fut,
+        Collection<ClusterNode> nodes) throws IgniteCheckedException {
+        GridCacheQueryBean bean = fut.query();
+        GridCacheQueryAdapter<?> qry = bean.query();
+
+        boolean deployFilterOrTransformer = (qry.scanFilter() != null || qry.transform() != null)
+            && cctx.gridDeploy().enabled();
+
+        GridCacheQueryRequest req = new GridCacheQueryRequest(
+            cctx.cacheId(),
+            reqId,
+            cctx.name(),
+            qry.type(),
+            fut.fields(),
+            qry.clause(),
+            qry.idxQryDesc(),
+            qry.limit(),
+            qry.queryClassName(),
+            qry.scanFilter(),
+            qry.partition(),
+            bean.reducer(),
+            qry.transform(),
+            qry.pageSize(),
+            qry.includeBackups(),
+            bean.arguments(),
+            qry.includeMetadata(),
+            qry.keepBinary(),
+            qry.taskHash(),
+            cctx.startTopologyVersion(),
+            qry.mvccSnapshot(),
+            // Force deployment anyway if scan query is used.
+            cctx.deploymentEnabled() || deployFilterOrTransformer,
+            qry.isDataPageScanEnabled());
+
+        List<UUID> sendNodes = new ArrayList<>();
+
+        for (ClusterNode n: nodes)
+            sendNodes.add(n.id());
+
+        sendRequest(fut, req, sendNodes);
+    }
+
+    /**
+     * Send request for fetching query result pages to specified nodes.
+     *
+     * @param reqId Request (cache query) ID.
+     * @param nodes Collection of nodes to send a request.
+     * @param all If {@code true} then request for all pages, otherwise for single only.
+     */
+    public void requestPages(long reqId, GridCacheQueryFutureAdapter<?, ?, ?> fut, Collection<UUID> nodes, boolean all) {
+        GridCacheQueryAdapter<?> qry = fut.query().query();
+
+        GridCacheQueryRequest req = new GridCacheQueryRequest(
+            cctx.cacheId(),
+            reqId,
+            cctx.name(),
+            qry.pageSize(),
+            qry.includeBackups(),
+            fut.fields(),
+            all,
+            qry.keepBinary(),
+            qry.taskHash(),
+            cctx.startTopologyVersion(),
+            // Force deployment anyway if scan query is used.
+            cctx.deploymentEnabled() || (qry.scanFilter() != null && cctx.gridDeploy().enabled()),
+            qry.isDataPageScanEnabled());
+
+        try {
+            sendRequest(fut, req, nodes);
+
+        } catch (IgniteCheckedException e) {
+            fut.onDone(e);
+        }
+    }
+
+    /**
+     * Send cancel query request, so no new pages will be sent.
+     *
+     * @param reqId Query request ID.
+     * @param nodes Collection of nodes to send the cancel request.
+     * @param fieldsQry Whether query is a fields query.
+     *
+     */
+    public void cancelQuery(long reqId, Collection<UUID> nodes, boolean fieldsQry) {
+        GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
+
+        assert qryMgr != null;
+
+        try {
+            GridCacheQueryRequest req = new GridCacheQueryRequest(cctx.cacheId(),
+                reqId,
+                fieldsQry,
+                cctx.startTopologyVersion(),
+                cctx.deploymentEnabled());
+
+            sendRequest(null, req, nodes);
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to send cancel request (will cancel query in any case).", e);
+        }
+    }
+
+    /**
+     * Sends query request.
+     *
+     * @param fut Cache query future. {@code null} in case of cancel request.
+     * @param req Request.
+     * @param nodes Nodes.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendRequest(
+        @Nullable GridCacheQueryFutureAdapter<?, ?, ?> fut,
+        GridCacheQueryRequest req,
+        Collection<UUID> nodes
+    ) throws IgniteCheckedException {
+        assert req != null;
+        assert nodes != null;
+
+        UUID locNodeId = cctx.localNodeId();
+
+        boolean loc = false;
+
+        for (UUID nodeId : nodes) {
+            if (nodeId.equals(locNodeId))
+                loc = true;
+            else {
+                if (req.cancel())
+                    sendNodeCancelRequest(nodeId, req);
+                else if (!sendNodePageRequest(nodeId, req, fut))
+                    return;
+            }
+        }
+
+        if (loc)
+            sendLoc.accept(req);
+    }
+
+    /** */
+    private void sendNodeCancelRequest(UUID nodeId, GridCacheQueryRequest req) throws IgniteCheckedException {
+        try {
+            cctx.io().send(nodeId, req, GridIoPolicy.QUERY_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            if (cctx.io().checkNodeLeft(nodeId, e, false)) {

Review comment:
       It seems this is a legacy check, however, it can be removed also, since the DiscoveryEventListener already exists for the futures handling class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org