You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2021/05/13 14:08:32 UTC

[GitHub] [ignite] alex-plekhanov opened a new pull request #9095: IGNITE-14638 Support for INTERSECT operator

alex-plekhanov opened a new pull request #9095:
URL: https://github.com/apache/ignite/pull/9095


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r641382373



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       I think this plan is not correct:
   ```
   IgniteReduceMinus(all=[false])
     IgniteExchange(distribution=[single])
       IgniteMapMinus(all=[false])
         IgniteExchange(distribution=[random])
           IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
         IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```
   This extra exchange with `distribution=[random]` is redundant, we don't need to send any data anywhere here, instead `single` distribution should satisfy `random` distribution and it makes possible plans like this:
    ```
   IgniteReduceMinus(all=[false])
     IgniteExchange(distribution=[single])
       IgniteMapMinus(all=[false])
         IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
         IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640654620



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
+ */
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Grouping<Row> grouping;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** Current source index. */
+    private int curSrcIdx;
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, Grouping<Row> grouping) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.grouping = grouping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources());
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        else if (!inLoop)
+            context().execute(this::flush, this::onError);
+    }
+
+    /** */
+    public void push(Row row, int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        grouping.add(row, idx);
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    public void end(int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+        assert curSrcIdx == idx;
+
+        checkState();
+
+        grouping.endOfSet(idx);
+
+        if (type == AggregateType.SINGLE && grouping.isEmpty())
+            curSrcIdx = sources().size(); // Skip subsequent sources.
+        else
+            curSrcIdx++;
+
+        if (curSrcIdx >= sources().size()) {
+            waiting = -1;
+
+            flush();
+        }
+        else
+            sources().get(curSrcIdx).request(waiting);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grouping.groups.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        return new Downstream<Row>() {
+            @Override public void push(Row row) throws Exception {
+                AbstractSetOpNode.this.push(row, idx);
+            }
+
+            @Override public void end() throws Exception {
+                AbstractSetOpNode.this.end(idx);
+            }
+
+            @Override public void onError(Throwable e) {
+                AbstractSetOpNode.this.onError(e);
+            }
+        };
+    }
+
+    /** */
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
+        assert waiting == -1;
+
+        int processed = 0;
+
+        inLoop = true;
+
+        try {
+            while (requested > 0 && !grouping.isEmpty()) {
+                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+
+                for (Row row : grouping.getRows(toSnd)) {
+                    checkState();

Review comment:
       Looks like almost all nodes have this check in `push` method, removed it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640575050



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
+ */
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Grouping<Row> grouping;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** Current source index. */
+    private int curSrcIdx;
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, Grouping<Row> grouping) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.grouping = grouping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources());
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        else if (!inLoop)
+            context().execute(this::flush, this::onError);
+    }
+
+    /** */
+    public void push(Row row, int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        grouping.add(row, idx);
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    public void end(int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+        assert curSrcIdx == idx;
+
+        checkState();
+
+        grouping.endOfSet(idx);
+
+        if (type == AggregateType.SINGLE && grouping.isEmpty())
+            curSrcIdx = sources().size(); // Skip subsequent sources.
+        else
+            curSrcIdx++;
+
+        if (curSrcIdx >= sources().size()) {
+            waiting = -1;
+
+            flush();
+        }
+        else
+            sources().get(curSrcIdx).request(waiting);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grouping.groups.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        return new Downstream<Row>() {
+            @Override public void push(Row row) throws Exception {
+                AbstractSetOpNode.this.push(row, idx);
+            }
+
+            @Override public void end() throws Exception {
+                AbstractSetOpNode.this.end(idx);
+            }
+
+            @Override public void onError(Throwable e) {
+                AbstractSetOpNode.this.onError(e);
+            }
+        };
+    }
+
+    /** */
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
+        assert waiting == -1;
+
+        int processed = 0;
+
+        inLoop = true;
+
+        try {
+            while (requested > 0 && !grouping.isEmpty()) {
+                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
+
+                for (Row row : grouping.getRows(toSnd)) {
+                    checkState();

Review comment:
       probably we could omit this check on every iteration. Or reduce the check to only `checkCancelled()` and the check frequency to once per dozen rows.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640687455



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       > come to the conclusion that only random distribution should be allowed for input nodes
   
   My suggestion doesn't violate this. You could require a RANDOM distribution for input nodes, and in case of SINGLE distributed child the Exchange with random distribution function should be created. In case of BROADCAST input we could require any HASH distribution, so the TrimExchange will be created.
   
   Does it make any sense? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640713539



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       So currently the test `testSetOpSingleAndRandom()` provides plane like this:
   ```
   IgniteSingleMinus(all=[false])
     IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
     IgniteExchange(distribution=[single])
       IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```
   
   And follow plan should be possible too:
   ```
   IgniteReduceMinus(all=[false])
     IgniteExchange(distribution=[single])
       IgniteMapMinus(all=[false])
         IgniteExchange(distribution=[random])
           IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
         IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640586896



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -17,246 +17,38 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Execution node for MINUS (EXCEPT) operator.
  */
-public class MinusNode<Row> extends AbstractNode<Row> {
+public class MinusNode<Row> extends AbstractSetOpNode<Row> {
     /** */
-    private final AggregateType type;
-
-    /** */
-    private final boolean all;
-
-    /** */
-    private final RowFactory<Row> rowFactory;
-
-    /** */
-    private final Grouping grouping;
-
-    /** */
-    private int requested;
-
-    /** */
-    private int waiting;
-
-    /** Current source index. */
-    private int curSrcIdx;
-
-    /** */
-    private boolean inLoop;
-
-    /**
-     * @param ctx Execution context.
-     */
     public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
         RowFactory<Row> rowFactory) {
-        super(ctx, rowType);
-
-        this.all = all;
-        this.type = type;
-        this.rowFactory = rowFactory;
-
-        grouping = new Grouping();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void request(int rowsCnt) throws Exception {
-        assert !F.isEmpty(sources());
-        assert rowsCnt > 0 && requested == 0;
-        assert waiting <= 0;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
-        else if (!inLoop)
-            context().execute(this::flush, this::onError);
-    }
-
-    /** */
-    public void push(Row row, int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-
-        checkState();
-
-        waiting--;
-
-        grouping.add(row, idx);
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        super(ctx, rowType, type, all, rowFactory, new MinusGrouping<>(ctx, rowFactory, type, all));
     }
 
     /** */
-    public void end(int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-        assert curSrcIdx == idx;
-
-        checkState();
-
-        if (type == AggregateType.SINGLE && grouping.isEmpty())
-            curSrcIdx = sources().size(); // Skip subsequent sources.
-        else
-            curSrcIdx++;
-
-        if (curSrcIdx >= sources().size()) {
-            waiting = -1;
-
-            flush();
-        }
-        else
-            sources().get(curSrcIdx).request(waiting);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void rewindInternal() {
-        requested = 0;
-        waiting = 0;
-        grouping.groups.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Downstream<Row> requestDownstream(int idx) {
-        return new Downstream<Row>() {
-            @Override public void push(Row row) throws Exception {
-                MinusNode.this.push(row, idx);
-            }
-
-            @Override public void end() throws Exception {
-                MinusNode.this.end(idx);
-            }
-
-            @Override public void onError(Throwable e) {
-                MinusNode.this.onError(e);
-            }
-        };
-    }
-
-    /** */
-    private void flush() throws Exception {
-        if (isClosed())
-            return;
-
-        checkState();
-
-        assert waiting == -1;
-
-        int processed = 0;
-
-        inLoop = true;
-
-        try {
-            while (requested > 0 && !grouping.isEmpty()) {
-                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
-
-                for (Row row : grouping.getRows(toSnd)) {
-                    checkState();
-
-                    requested--;
-                    downstream().push(row);
-
-                    processed++;
-                }
-
-                if (processed >= IN_BUFFER_SIZE && requested > 0) {
-                    // Allow others to do their job.
-                    context().execute(this::flush, this::onError);
-
-                    return;
-                }
-            }
-        }
-        finally {
-            inLoop = false;
-        }
-
-        if (requested > 0) {
-            requested = 0;
-
-            downstream().end();
-        }
-    }
-
-    /** */
-    private class Grouping {
-        /**
-         * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
-         * keys in all sets except first.
-         */
-        private final Map<GroupKey, int[]> groups = new HashMap<>();
-
-        /** */
-        private final RowHandler<Row> hnd;
-
-        /** */
-        private Grouping() {
-            hnd = context().rowHandler();
-        }
-
+    private static class MinusGrouping<Row> extends Grouping<Row> {
         /** */
-        private void add(Row row, int setIdx) {
-            if (type == AggregateType.REDUCE) {
-                assert setIdx == 0 : "Unexpected set index: " + setIdx;
-
-                addOnReducer(row);
-            }
-            else if (type == AggregateType.MAP)
-                addOnMapper(row, setIdx);
-            else
-                addOnSingle(row, setIdx);
-        }
-
-        /**
-         * @param cnt Number of rows.
-         *
-         * @return Actually sent rows number.
-         */
-        private List<Row> getRows(int cnt) {
-            if (F.isEmpty(groups))
-                return Collections.emptyList();
-            else if (type == AggregateType.MAP)
-                return getOnMapper(cnt);
-            else
-                return getOnSingleOrReducer(cnt);
+        private MinusGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+            super(ctx, rowFactory, type, all);
         }
 
         /** */

Review comment:
       ```suggestion
           /** {@inheritDoc} */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640513400



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+
+/**
+ * Physical node for REDUCE phase of INTERSECT operator.
+ */
+public class IgniteReduceIntersect extends IgniteIntersect implements IgniteReduceSetOp {
+    /** */
+    public IgniteReduceIntersect(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        boolean all,
+        RelDataType rowType
+    ) {
+        super(cluster, traitSet, ImmutableList.of(input), all);
+
+        this.rowType = rowType;
+    }
+
+    /** */
+    public IgniteReduceIntersect(RelInput input) {
+        this(
+            input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getInput(),
+            input.getBoolean("all", false),
+            input.getRowType("rowType")
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        super.explainTerms(pw)
+            .itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES);
+
+        return pw;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteReduceIntersect(getCluster(), traitSet, sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteReduceIntersect clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceIntersect(cluster, getTraitSet(), sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int aggregateFieldsCount() {
+        return rowType.getFieldCount() + 2 /* At least two fields required for count aggregation. */;

Review comment:
       Looks like this is not quite precise. According to `AbstractSetOpNode.Grouping#addOnReducer` there is a `grpKey` that effectively a row we are going to return, and array of ints with length equals to `cntrsMap.size()` that is number of inputs 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640586727



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }
 
         /** */

Review comment:
       ```suggestion
           /** {@inheritDoc} */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640670389



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteReduceIntersect.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelInput;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
+import org.apache.calcite.rel.core.SetOp;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlExplainLevel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRel;
+import org.apache.ignite.internal.processors.query.calcite.rel.IgniteRelVisitor;
+
+/**
+ * Physical node for REDUCE phase of INTERSECT operator.
+ */
+public class IgniteReduceIntersect extends IgniteIntersect implements IgniteReduceSetOp {
+    /** */
+    public IgniteReduceIntersect(
+        RelOptCluster cluster,
+        RelTraitSet traitSet,
+        RelNode input,
+        boolean all,
+        RelDataType rowType
+    ) {
+        super(cluster, traitSet, ImmutableList.of(input), all);
+
+        this.rowType = rowType;
+    }
+
+    /** */
+    public IgniteReduceIntersect(RelInput input) {
+        this(
+            input.getCluster(),
+            input.getTraitSet().replace(IgniteConvention.INSTANCE),
+            input.getInput(),
+            input.getBoolean("all", false),
+            input.getRowType("rowType")
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        super.explainTerms(pw)
+            .itemIf("rowType", rowType, pw.getDetailLevel() == SqlExplainLevel.ALL_ATTRIBUTES);
+
+        return pw;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SetOp copy(RelTraitSet traitSet, List<RelNode> inputs, boolean all) {
+        return new IgniteReduceIntersect(getCluster(), traitSet, sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteReduceIntersect clone(RelOptCluster cluster, List<IgniteRel> inputs) {
+        return new IgniteReduceIntersect(cluster, getTraitSet(), sole(inputs), all, rowType);
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T accept(IgniteRelVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int aggregateFieldsCount() {
+        return rowType.getFieldCount() + 2 /* At least two fields required for count aggregation. */;

Review comment:
       I think it's not so important to have exact fields count here, some approximation will be enough (we should pass inputs count to the constructor and store it only for exact calculation). We already have a very rough estimation here, so having exact counter size doesn't make memory cost precise. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640587410



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+    /** */
+    public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, int inputsCnt) {
+        super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+    }
+
+    /** */
+    private static class IntersectGrouping<Row> extends Grouping<Row> {
+        /** Inputs count. */
+        private final int inputsCnt;
+
+        /** */
+        private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+            boolean all, int inputsCnt) {
+            super(ctx, rowFactory, type, all);
+
+            this.inputsCnt = inputsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void endOfSet(int setIdx) {
+            if (type == AggregateType.SINGLE && rowsCnt == 0)
+                groups.clear();
+
+            super.endOfSet(setIdx);
+        }
+
+        /** */

Review comment:
       ```suggestion
           /** {@inheritDoc} */
   ```

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+    /** */
+    public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, int inputsCnt) {
+        super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+    }
+
+    /** */
+    private static class IntersectGrouping<Row> extends Grouping<Row> {
+        /** Inputs count. */
+        private final int inputsCnt;
+
+        /** */
+        private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+            boolean all, int inputsCnt) {
+            super(ctx, rowFactory, type, all);
+
+            this.inputsCnt = inputsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void endOfSet(int setIdx) {
+            if (type == AggregateType.SINGLE && rowsCnt == 0)
+                groups.clear();
+
+            super.endOfSet(setIdx);
+        }
+
+        /** */
+        @Override protected void addOnSingle(Row row, int setIdx) {
+            int[] cntrs;
+
+            GroupKey key = key(row);
+
+            if (setIdx == 0) {
+                cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
+
+                cntrs[0]++;
+            }
+            else {
+                cntrs = groups.get(key);
+
+                if (cntrs != null) {
+                    if (cntrs[setIdx - 1] == 0)
+                        groups.remove(key);
+                    else
+                        cntrs[setIdx]++;
+                }
+            }
+        }
+
+        /** */

Review comment:
       ```suggestion
          /** {@inheritDoc} */
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640676542



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       I think we discussed it here https://github.com/apache/ignite/pull/9009#discussion_r617440209 and come to the conclusion that only random distribution should be allowed for input nodes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640652781



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
##########
@@ -852,6 +876,112 @@ public void testExceptBigBatch() throws Exception {
         assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
         assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
         assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(136, rows.size());
+        assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+    }
+
+    /** */
+    @Test
+    public void testIntersect() throws Exception {

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640671410



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }

Review comment:
       Sorry if I didn't make myself clear




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640652319



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
##########
@@ -97,4 +73,7 @@ protected IgniteMinusBase(RelInput input) {
 
         return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640560200



##########
File path: modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
##########
@@ -852,6 +876,112 @@ public void testExceptBigBatch() throws Exception {
         assertEquals(128, F.size(rows, r -> r.get(0).equals(2)));
         assertEquals(1920, F.size(rows, r -> r.get(0).equals(3)));
         assertEquals(32768, F.size(rows, r -> r.get(0).equals(4)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(2, rows.size());
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(1, F.size(rows, r -> r.get(0).equals(3)));
+
+        rows = sql("SELECT _val FROM \"cache1Replicated\".table1_repl INTERSECT ALL SELECT _val FROM \"cache2Replicated\"" +
+            ".table2_repl");
+
+        assertEquals(136, rows.size());
+        assertEquals(8, F.size(rows, r -> r.get(0).equals(1)));
+        assertEquals(128, F.size(rows, r -> r.get(0).equals(3)));
+    }
+
+    /** */
+    @Test
+    public void testIntersect() throws Exception {

Review comment:
       I would prefer not to extend a CalciteQueryProcessorTest since it's already a quite messy and most of the tests should be revised. Let's move those newly created tests to new class with name SetOpIntegrationTest




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640652561



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
##########
@@ -19,71 +19,47 @@
 
 import java.util.List;
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Base class for physical MINUS (EXCEPT) set op.
+ * Base interface for physical set op node (MINUS, INTERSECT).
  */
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
-    /** Count of counter fields used to aggregate results. */
-    protected static final int COUNTER_FIELDS_CNT = 2;
-
-    /** */
-    IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
-        super(cluster, traits, inputs, all);
-    }
-
-    /** {@inheritDoc} */
-    protected IgniteMinusBase(RelInput input) {
-        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
-    }
+public interface IgniteSetOp extends TraitsAwareIgniteRel {
+    /** ALL flag of set op. */
+    public boolean all();
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
     }
 
     /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
-    protected abstract int aggregateFieldsCount();
-
-    /** {@inheritDoc} */
-    @Override public double estimateRowCount(RelMetadataQuery mq) {
-        final List<RelNode> inputs = getInputs();
-
-        double rows = mq.getRowCount(inputs.get(0));
-
-        for (int i = 1; i < inputs.size(); i++)
-            rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
-
-        return rows;
-    }
+    public int aggregateFieldsCount();
 
     /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    @Override public default RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r642367174



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640653281



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
+ */
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Grouping<Row> grouping;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** Current source index. */
+    private int curSrcIdx;
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, Grouping<Row> grouping) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.grouping = grouping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources());
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        else if (!inLoop)
+            context().execute(this::flush, this::onError);
+    }
+
+    /** */
+    public void push(Row row, int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        grouping.add(row, idx);
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    public void end(int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+        assert curSrcIdx == idx;
+
+        checkState();
+
+        grouping.endOfSet(idx);
+
+        if (type == AggregateType.SINGLE && grouping.isEmpty())
+            curSrcIdx = sources().size(); // Skip subsequent sources.
+        else
+            curSrcIdx++;
+
+        if (curSrcIdx >= sources().size()) {
+            waiting = -1;
+
+            flush();
+        }
+        else
+            sources().get(curSrcIdx).request(waiting);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grouping.groups.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        return new Downstream<Row>() {
+            @Override public void push(Row row) throws Exception {
+                AbstractSetOpNode.this.push(row, idx);
+            }
+
+            @Override public void end() throws Exception {
+                AbstractSetOpNode.this.end(idx);
+            }
+
+            @Override public void onError(Throwable e) {
+                AbstractSetOpNode.this.onError(e);
+            }
+        };
+    }
+
+    /** */
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
+        assert waiting == -1;
+
+        int processed = 0;
+
+        inLoop = true;
+
+        try {
+            while (requested > 0 && !grouping.isEmpty()) {

Review comment:
       Looks like only one iteration is possible. Changed to `if`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640687091



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }

Review comment:
       Ok, got it, thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r641382373



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       I think this plan is not correct:
   ```IgniteReduceMinus(all=[false])
     IgniteExchange(distribution=[single])
       IgniteMapMinus(all=[false])
         IgniteExchange(distribution=[random])
           IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
         IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```
   This extra exchange with `distribution=[random]` is redundant, we don't need to send any data anywhere here, instead `single` distribution should satisfy `random` distribution and it makes possible plans like this:
    ```IgniteReduceMinus(all=[false])
     IgniteExchange(distribution=[single])
       IgniteMapMinus(all=[false])
         IgniteTableScan(table=[[PUBLIC, SINGLE_TBL1]])
         IgniteTableScan(table=[[PUBLIC, RANDOM_TBL1]])
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640584236



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }

Review comment:
       this comment should be a few rows above the current one (see line 271).
   
   should we remove the key as far as the first row comes from `setIdx>0` in case all = false?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov closed pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov closed pull request #9095:
URL: https://github.com/apache/ignite/pull/9095


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#issuecomment-854755628


   Merged to sql-calcite


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640670530



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }

Review comment:
       I want to put a comment `should we remove the key as far as the first row comes from setIdx>0 in case all = false?` to the line 271, but GH didn't allow me to do this since this line is too far from the nearest changed by this patch. So I put it to the line 274 and left a note describing what line this intended for (`this comment should be a few rows above the current one (see line 271).` -- this is the note).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640656426



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }

Review comment:
       Didn't get about "comment". Can you please clarify what do you mean?
   Fixed logic for "all = false"

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -274,94 +66,21 @@ private void addOnSingle(Row row, int setIdx) {
         }
 
         /** */

Review comment:
       Fixed

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
##########
@@ -17,246 +17,38 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.rel;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
-import org.apache.ignite.internal.util.typedef.F;
 
 /**
  * Execution node for MINUS (EXCEPT) operator.
  */
-public class MinusNode<Row> extends AbstractNode<Row> {
+public class MinusNode<Row> extends AbstractSetOpNode<Row> {
     /** */
-    private final AggregateType type;
-
-    /** */
-    private final boolean all;
-
-    /** */
-    private final RowFactory<Row> rowFactory;
-
-    /** */
-    private final Grouping grouping;
-
-    /** */
-    private int requested;
-
-    /** */
-    private int waiting;
-
-    /** Current source index. */
-    private int curSrcIdx;
-
-    /** */
-    private boolean inLoop;
-
-    /**
-     * @param ctx Execution context.
-     */
     public MinusNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
         RowFactory<Row> rowFactory) {
-        super(ctx, rowType);
-
-        this.all = all;
-        this.type = type;
-        this.rowFactory = rowFactory;
-
-        grouping = new Grouping();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void request(int rowsCnt) throws Exception {
-        assert !F.isEmpty(sources());
-        assert rowsCnt > 0 && requested == 0;
-        assert waiting <= 0;
-
-        checkState();
-
-        requested = rowsCnt;
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
-        else if (!inLoop)
-            context().execute(this::flush, this::onError);
-    }
-
-    /** */
-    public void push(Row row, int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-
-        checkState();
-
-        waiting--;
-
-        grouping.add(row, idx);
-
-        if (waiting == 0)
-            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        super(ctx, rowType, type, all, rowFactory, new MinusGrouping<>(ctx, rowFactory, type, all));
     }
 
     /** */
-    public void end(int idx) throws Exception {
-        assert downstream() != null;
-        assert waiting > 0;
-        assert curSrcIdx == idx;
-
-        checkState();
-
-        if (type == AggregateType.SINGLE && grouping.isEmpty())
-            curSrcIdx = sources().size(); // Skip subsequent sources.
-        else
-            curSrcIdx++;
-
-        if (curSrcIdx >= sources().size()) {
-            waiting = -1;
-
-            flush();
-        }
-        else
-            sources().get(curSrcIdx).request(waiting);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void rewindInternal() {
-        requested = 0;
-        waiting = 0;
-        grouping.groups.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Downstream<Row> requestDownstream(int idx) {
-        return new Downstream<Row>() {
-            @Override public void push(Row row) throws Exception {
-                MinusNode.this.push(row, idx);
-            }
-
-            @Override public void end() throws Exception {
-                MinusNode.this.end(idx);
-            }
-
-            @Override public void onError(Throwable e) {
-                MinusNode.this.onError(e);
-            }
-        };
-    }
-
-    /** */
-    private void flush() throws Exception {
-        if (isClosed())
-            return;
-
-        checkState();
-
-        assert waiting == -1;
-
-        int processed = 0;
-
-        inLoop = true;
-
-        try {
-            while (requested > 0 && !grouping.isEmpty()) {
-                int toSnd = Math.min(requested, IN_BUFFER_SIZE - processed);
-
-                for (Row row : grouping.getRows(toSnd)) {
-                    checkState();
-
-                    requested--;
-                    downstream().push(row);
-
-                    processed++;
-                }
-
-                if (processed >= IN_BUFFER_SIZE && requested > 0) {
-                    // Allow others to do their job.
-                    context().execute(this::flush, this::onError);
-
-                    return;
-                }
-            }
-        }
-        finally {
-            inLoop = false;
-        }
-
-        if (requested > 0) {
-            requested = 0;
-
-            downstream().end();
-        }
-    }
-
-    /** */
-    private class Grouping {
-        /**
-         * Value in this map will always have 2 elements, first - count of keys in the first set, second - count of
-         * keys in all sets except first.
-         */
-        private final Map<GroupKey, int[]> groups = new HashMap<>();
-
-        /** */
-        private final RowHandler<Row> hnd;
-
-        /** */
-        private Grouping() {
-            hnd = context().rowHandler();
-        }
-
+    private static class MinusGrouping<Row> extends Grouping<Row> {
         /** */
-        private void add(Row row, int setIdx) {
-            if (type == AggregateType.REDUCE) {
-                assert setIdx == 0 : "Unexpected set index: " + setIdx;
-
-                addOnReducer(row);
-            }
-            else if (type == AggregateType.MAP)
-                addOnMapper(row, setIdx);
-            else
-                addOnSingle(row, setIdx);
-        }
-
-        /**
-         * @param cnt Number of rows.
-         *
-         * @return Actually sent rows number.
-         */
-        private List<Row> getRows(int cnt) {
-            if (F.isEmpty(groups))
-                return Collections.emptyList();
-            else if (type == AggregateType.MAP)
-                return getOnMapper(cnt);
-            else
-                return getOnSingleOrReducer(cnt);
+        private MinusGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type, boolean all) {
+            super(ctx, rowFactory, type, all);
         }
 
         /** */

Review comment:
       Fixed

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+    /** */
+    public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, int inputsCnt) {
+        super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+    }
+
+    /** */
+    private static class IntersectGrouping<Row> extends Grouping<Row> {
+        /** Inputs count. */
+        private final int inputsCnt;
+
+        /** */
+        private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+            boolean all, int inputsCnt) {
+            super(ctx, rowFactory, type, all);
+
+            this.inputsCnt = inputsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void endOfSet(int setIdx) {
+            if (type == AggregateType.SINGLE && rowsCnt == 0)
+                groups.clear();
+
+            super.endOfSet(setIdx);
+        }
+
+        /** */

Review comment:
       Fixed

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+
+/**
+ * Execution node for INTERSECT operator.
+ */
+public class IntersectNode<Row> extends AbstractSetOpNode<Row> {
+    /** */
+    public IntersectNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, int inputsCnt) {
+        super(ctx, rowType, type, all, rowFactory, new IntersectGrouping<>(ctx, rowFactory, type, all, inputsCnt));
+    }
+
+    /** */
+    private static class IntersectGrouping<Row> extends Grouping<Row> {
+        /** Inputs count. */
+        private final int inputsCnt;
+
+        /** */
+        private IntersectGrouping(ExecutionContext<Row> ctx, RowFactory<Row> rowFactory, AggregateType type,
+            boolean all, int inputsCnt) {
+            super(ctx, rowFactory, type, all);
+
+            this.inputsCnt = inputsCnt;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void endOfSet(int setIdx) {
+            if (type == AggregateType.SINGLE && rowsCnt == 0)
+                groups.clear();
+
+            super.endOfSet(setIdx);
+        }
+
+        /** */
+        @Override protected void addOnSingle(Row row, int setIdx) {
+            int[] cntrs;
+
+            GroupKey key = key(row);
+
+            if (setIdx == 0) {
+                cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
+
+                cntrs[0]++;
+            }
+            else {
+                cntrs = groups.get(key);
+
+                if (cntrs != null) {
+                    if (cntrs[setIdx - 1] == 0)
+                        groups.remove(key);
+                    else
+                        cntrs[setIdx]++;
+                }
+            }
+        }
+
+        /** */

Review comment:
       Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640448648



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.rel.set;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
+import org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
+import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
+import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.internal.processors.query.calcite.util.Commons;
+
+/**
+ * Physical node for MAP phase of set op (MINUS, INTERSECT).
+ */
+public interface IgniteMapSetOp extends IgniteSetOp {
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveRewindability(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        boolean rewindable = inputTraits.stream()
+            .map(TraitUtils::rewindability)
+            .allMatch(RewindabilityTrait::rewindable);
+
+        if (rewindable)
+            return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.REWINDABLE), inputTraits));
+
+        return ImmutableList.of(Pair.of(nodeTraits.replace(RewindabilityTrait.ONE_WAY),
+            Commons.transform(inputTraits, t -> t.replace(RewindabilityTrait.ONE_WAY))));
+    }
+
+    /** {@inheritDoc} */
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveDistribution(
+        RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits
+    ) {
+        if (inputTraits.stream().allMatch(t -> TraitUtils.distribution(t).satisfies(IgniteDistributions.single())))
+            return ImmutableList.of(); // If all distributions are single or broadcast IgniteSingleMinus should be used.
+
+        if (inputTraits.stream().anyMatch(t -> TraitUtils.distribution(t) == IgniteDistributions.single()))
+            return ImmutableList.of(); // Mixing of single and random is prohibited.

Review comment:
       Probably it would be better to convert those single rels as well as broadcasted to a hash distributed

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
##########
@@ -97,4 +73,7 @@ protected IgniteMinusBase(RelInput input) {
 
         return costFactory.makeCost(rows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);

Review comment:
       ```suggestion
           return costFactory.makeCost(inputRows, inputRows * IgniteCost.ROW_PASS_THROUGH_COST, 0, mem, 0);
   ```
   
   

##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteSetOp.java
##########
@@ -19,71 +19,47 @@
 
 import java.util.List;
 import com.google.common.collect.ImmutableList;
-import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptCost;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelCollations;
-import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.core.Minus;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.processors.query.calcite.metadata.cost.IgniteCostFactory;
-import org.apache.ignite.internal.processors.query.calcite.rel.IgniteConvention;
-import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
 import org.apache.ignite.internal.processors.query.calcite.trait.TraitsAwareIgniteRel;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 
 /**
- * Base class for physical MINUS (EXCEPT) set op.
+ * Base interface for physical set op node (MINUS, INTERSECT).
  */
-public abstract class IgniteMinusBase extends Minus implements TraitsAwareIgniteRel {
-    /** Count of counter fields used to aggregate results. */
-    protected static final int COUNTER_FIELDS_CNT = 2;
-
-    /** */
-    IgniteMinusBase(RelOptCluster cluster, RelTraitSet traits, List<RelNode> inputs, boolean all) {
-        super(cluster, traits, inputs, all);
-    }
-
-    /** {@inheritDoc} */
-    protected IgniteMinusBase(RelInput input) {
-        super(TraitUtils.changeTraits(input, IgniteConvention.INSTANCE));
-    }
+public interface IgniteSetOp extends TraitsAwareIgniteRel {
+    /** ALL flag of set op. */
+    public boolean all();
 
     /** {@inheritDoc} */
-    @Override public Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default Pair<RelTraitSet, List<RelTraitSet>> passThroughCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY)));
     }
 
     /** {@inheritDoc} */
-    @Override public List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits, List<RelTraitSet> inputTraits) {
+    @Override public default List<Pair<RelTraitSet, List<RelTraitSet>>> deriveCollation(RelTraitSet nodeTraits,
+        List<RelTraitSet> inputTraits) {
         // Operation erases collation.
         return ImmutableList.of(Pair.of(nodeTraits.replace(RelCollations.EMPTY),
             Commons.transform(inputTraits, t -> t.replace(RelCollations.EMPTY))));
     }
 
     /** Gets count of fields for aggregation for this node. Required for memory consumption calculation. */
-    protected abstract int aggregateFieldsCount();
-
-    /** {@inheritDoc} */
-    @Override public double estimateRowCount(RelMetadataQuery mq) {
-        final List<RelNode> inputs = getInputs();
-
-        double rows = mq.getRowCount(inputs.get(0));
-
-        for (int i = 1; i < inputs.size(); i++)
-            rows -= 0.5 * Math.min(rows, mq.getRowCount(inputs.get(i)));
-
-        return rows;
-    }
+    public int aggregateFieldsCount();
 
     /** {@inheritDoc} */
-    @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    @Override public default RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {

Review comment:
       the problem with default method is there should be no implementation within the whole class hierarchy, but all ignite rels is actually descendants of AbstractRelNode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] korlov42 commented on a change in pull request #9095: IGNITE-14638 Support for INTERSECT operator

Posted by GitBox <gi...@apache.org>.
korlov42 commented on a change in pull request #9095:
URL: https://github.com/apache/ignite/pull/9095#discussion_r640572493



##########
File path: modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.rel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
+import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Abstract execution node for set operators (EXCEPT, INTERSECT).
+ */
+public abstract class AbstractSetOpNode<Row> extends AbstractNode<Row> {
+    /** */
+    private final AggregateType type;
+
+    /** */
+    private final Grouping<Row> grouping;
+
+    /** */
+    private int requested;
+
+    /** */
+    private int waiting;
+
+    /** Current source index. */
+    private int curSrcIdx;
+
+    /** */
+    private boolean inLoop;
+
+    /** */
+    protected AbstractSetOpNode(ExecutionContext<Row> ctx, RelDataType rowType, AggregateType type, boolean all,
+        RowFactory<Row> rowFactory, Grouping<Row> grouping) {
+        super(ctx, rowType);
+
+        this.type = type;
+        this.grouping = grouping;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void request(int rowsCnt) throws Exception {
+        assert !F.isEmpty(sources());
+        assert rowsCnt > 0 && requested == 0;
+        assert waiting <= 0;
+
+        checkState();
+
+        requested = rowsCnt;
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+        else if (!inLoop)
+            context().execute(this::flush, this::onError);
+    }
+
+    /** */
+    public void push(Row row, int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+
+        checkState();
+
+        waiting--;
+
+        grouping.add(row, idx);
+
+        if (waiting == 0)
+            sources().get(curSrcIdx).request(waiting = IN_BUFFER_SIZE);
+    }
+
+    /** */
+    public void end(int idx) throws Exception {
+        assert downstream() != null;
+        assert waiting > 0;
+        assert curSrcIdx == idx;
+
+        checkState();
+
+        grouping.endOfSet(idx);
+
+        if (type == AggregateType.SINGLE && grouping.isEmpty())
+            curSrcIdx = sources().size(); // Skip subsequent sources.
+        else
+            curSrcIdx++;
+
+        if (curSrcIdx >= sources().size()) {
+            waiting = -1;
+
+            flush();
+        }
+        else
+            sources().get(curSrcIdx).request(waiting);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void rewindInternal() {
+        requested = 0;
+        waiting = 0;
+        grouping.groups.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Downstream<Row> requestDownstream(int idx) {
+        return new Downstream<Row>() {
+            @Override public void push(Row row) throws Exception {
+                AbstractSetOpNode.this.push(row, idx);
+            }
+
+            @Override public void end() throws Exception {
+                AbstractSetOpNode.this.end(idx);
+            }
+
+            @Override public void onError(Throwable e) {
+                AbstractSetOpNode.this.onError(e);
+            }
+        };
+    }
+
+    /** */
+    private void flush() throws Exception {
+        if (isClosed())
+            return;
+
+        checkState();
+
+        assert waiting == -1;
+
+        int processed = 0;
+
+        inLoop = true;
+
+        try {
+            while (requested > 0 && !grouping.isEmpty()) {

Review comment:
       Is it possible to iterate over this loop more than once?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org