You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by HanumathRao <gi...@git.apache.org> on 2018/02/04 18:24:09 UTC

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

GitHub user HanumathRao opened a pull request:

    https://github.com/apache/drill/pull/1110

    DRILL-6115: SingleMergeExchange is not scaling up when many minor fra…

    …gments are allocated for a query.
    
    Currently a singlemerge exchange is merging all the fragment streams on foreman. This can cause cpu bottleneck and also huge memory consumption at the foreman. 
    
    This PR contains changes to introduce a new Multiplex Operator called OrderedMuxExchange which merges the minor fragment streams pertaining to one drillbit and send as one output stream to the foreman. 
    
    The existing multiplex mechanism is used to introduce these operators.
    
    Please review this PR.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HanumathRao/drill DRILL-6115

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1110
    
----
commit 43a71277aeec9bb377181728b2ce563437d7e46d
Author: hmaduri <hm...@...>
Date:   2018-01-22T00:42:28Z

    DRILL-6115: SingleMergeExchange is not scaling up when many minor fragments are allocated for a query.

----


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167956002
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java ---
    @@ -20,133 +20,41 @@
     import com.google.common.collect.Lists;
     
     import org.apache.drill.exec.planner.physical.ExchangePrel;
    -import org.apache.drill.exec.planner.physical.HashPrelUtil;
    -import org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
    -import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
     import org.apache.drill.exec.planner.physical.PlannerSettings;
     import org.apache.drill.exec.planner.physical.Prel;
    -import org.apache.drill.exec.planner.physical.ProjectPrel;
    -import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
    -import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
    -import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
    -import org.apache.drill.exec.planner.sql.DrillSqlOperator;
     import org.apache.drill.exec.server.options.OptionManager;
     import org.apache.calcite.rel.RelNode;
    -import org.apache.calcite.rel.type.RelDataType;
    -import org.apache.calcite.rel.type.RelDataTypeField;
    -import org.apache.calcite.rex.RexBuilder;
    -import org.apache.calcite.rex.RexNode;
    -import org.apache.calcite.rex.RexUtil;
    -
    -import java.math.BigDecimal;
    -import java.util.Collections;
     import java.util.List;
     
     public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
    -  private final boolean isMuxEnabled;
    -  private final boolean isDeMuxEnabled;
    -
    -
    -  public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> {
    -    private final RexBuilder rexBuilder;
    +  private final OptionManager options;
     
    -    public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
    -      this.rexBuilder = rexBuilder;
    -    }
    -
    -    @Override
    -    public RexNode createCall(String funcName, List<RexNode> inputFields) {
    -      final DrillSqlOperator op =
    -          new DrillSqlOperator(funcName, inputFields.size(), true, false);
    -      return rexBuilder.makeCall(op, inputFields);
    +  private static boolean isMuxEnabled(OptionManager options) {
    +    if (options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val ||
    --- End diff --
    
    use `return` instead of `if`


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r168338663
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---
    @@ -93,6 +94,21 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
         return creator.addMetadata(this, g);
       }
     
    +  /**
    +   * This method creates a new OrderedMux exchange if mux operators are enabled.
    +   * @param child input to the new muxPrel or new SingleMergeExchange node.
    +   * @param options options manager to check if mux is enabled.
    +   */
    +  @Override
    +  public Prel constructMuxPrel(Prel child, OptionManager options) throws RuntimeException {
    +    Prel outPrel = child;
    +    if (options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val) {
    --- End diff --
    
    @amansinha100  Thanks for the review . I have done the needed code changes to fix it. Please let me know if anything is required.


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    @amansinha100 @vrozov  Thank you for the review. I have addressed all the review comments. Please let me know if any changes are required.
    
    The commits are organized such that one commit is for refactoring the existing code and the second one is specific to the changes required for this JIRA. This is done for ease of reviewing the code. 


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    LGTM.  +1


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167447863
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java ---
    @@ -0,0 +1,116 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl;
    +
    +import org.apache.drill.PlanTestBase;
    +import org.apache.drill.common.expression.ExpressionPosition;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.rpc.user.QueryDataBatch;
    +import org.apache.drill.exec.vector.IntVector;
    +import org.apache.drill.test.ClusterFixture;
    +import org.apache.drill.test.ClusterFixtureBuilder;
    +import org.apache.drill.test.ClientFixture;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestOrderedMuxExchange extends PlanTestBase {
    +
    +  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
    +
    +
    +  private void validateResults(BufferAllocator allocator, List<QueryDataBatch> results) throws SchemaChangeException {
    +    long previousBigInt = Long.MIN_VALUE;
    +
    +    for (QueryDataBatch b : results) {
    +      RecordBatchLoader loader = new RecordBatchLoader(allocator);
    +      if (b.getHeader().getRowCount() > 0) {
    +        loader.load(b.getHeader().getDef(),b.getData());
    +        @SuppressWarnings({ "deprecation", "resource" })
    +        IntVector c1 = (IntVector) loader.getValueAccessorById(IntVector.class,
    +                   loader.getValueVectorId(new SchemaPath("id_i", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
    +        IntVector.Accessor a1 = c1.getAccessor();
    +
    +        for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
    +          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt <= a1.get(i));
    +          previousBigInt = a1.get(i);
    +        }
    +      }
    +      loader.clear();
    +      b.release();
    +    }
    +  }
    +
    +  /**
    +   * Test case to verify the OrderedMuxExchange created for order by clause.
    +   * It checks by forcing the plan to create OrderedMuxExchange and also verifies the
    +   * output column is ordered.
    +   *
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  @Test
    +  public void testOrderedMuxForOrderBy() throws Exception {
    +    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
    +            .maxParallelization(1)
    +            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
    +            ;
    +
    +    try (ClusterFixture cluster = builder.build();
    +         ClientFixture client = cluster.clientFixture()) {
    +      client.alterSession(ExecConstants.SLICE_TARGET, 10);
    +      String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` ORDER BY id_i limit 10";
    --- End diff --
    
    I am not sure how the table is organized..does it have already ordered id_i column ? if so, we should use a different column. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167092037
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java ---
    @@ -59,6 +59,7 @@ public static void setup() throws Exception {
         ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
             .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 1) // Unmanaged
             .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 1) // Unmanaged
    +        .configProperty(ExecConstants.EXTERNAL_SORT_MAX_MEMORY, 10 * 1024 * 1024) //use less memory for sorting.
    --- End diff --
    
    Why is the change necessary?


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167960149
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java ---
    @@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") PhysicalOperator child) {
       }
     
       @Override
    -  public Receiver getReceiver(int minorFragmentId) {
    -    createSenderReceiverMapping();
    -
    -    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    -    if (senders == null || senders.size() <= 0) {
    -      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    -    }
    -
    +  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
    +                                         List<MinorFragmentEndpoint> senders, boolean spooling) {
         return new UnorderedReceiver(senderMajorFragmentId, senders, false);
    --- End diff --
    
    Consider creating helper method like `getSenders()` that will return ` List<MinorFragmentEndpoint>` instead of `getReceiverInternal()`.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r168025910
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java ---
    @@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") PhysicalOperator child) {
       }
     
       @Override
    -  public Receiver getReceiver(int minorFragmentId) {
    -    createSenderReceiverMapping();
    -
    -    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    -    if (senders == null || senders.size() <= 0) {
    -      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    -    }
    -
    +  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
    +                                         List<MinorFragmentEndpoint> senders, boolean spooling) {
         return new UnorderedReceiver(senderMajorFragmentId, senders, false);
    --- End diff --
    
    Done.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1110


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r168044397
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SingleMergeExchangePrel.java ---
    @@ -93,6 +94,21 @@ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws
         return creator.addMetadata(this, g);
       }
     
    +  /**
    +   * This method creates a new OrderedMux exchange if mux operators are enabled.
    +   * @param child input to the new muxPrel or new SingleMergeExchange node.
    +   * @param options options manager to check if mux is enabled.
    +   */
    +  @Override
    +  public Prel constructMuxPrel(Prel child, OptionManager options) throws RuntimeException {
    +    Prel outPrel = child;
    +    if (options.getOption(PlannerSettings.ORDERED_MUX_EXCHANGE.getOptionName()).bool_val) {
    --- End diff --
    
    @HanumathRao I think the ordered_mux should be created when both mux and ordered_mux flags are enabled.  Users who disable the global 'mux' flag would very likely expect that all mux exchanges are disabled. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167954543
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java ---
    @@ -37,14 +37,8 @@ public UnorderedMuxExchange(@JsonProperty("child") PhysicalOperator child) {
       }
     
       @Override
    -  public Receiver getReceiver(int minorFragmentId) {
    -    createSenderReceiverMapping();
    -
    -    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    -    if (senders == null || senders.size() <= 0) {
    -      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    -    }
    -
    +  protected Receiver getReceiverInternal(int oppositeMajorFragmentId,
    +                                         List<MinorFragmentEndpoint> senders, boolean spooling) {
         return new UnorderedReceiver(senderMajorFragmentId, senders, false);
    --- End diff --
    
    `spooling` parameter is ignored, is this expected?


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    @amansinha100  @vrozov  Thanks for the review. I have squashed all the commits into two commits.
    
    Please merge these two commits individually into the apache master branch. First commit is for refactoring existing code. Second commit is about fixing this JIRA request.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167448218
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,64 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
    +
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering> orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +    if (senders == null || senders.size() <= 0) {
    +      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    +    }
    +
    +    if (logger.isDebugEnabled()) {
    +      logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    +    }
    +
    +    return new MergingReceiverPOP(senderMajorFragmentId, senders, orderings, false);
    --- End diff --
    
    The HashToMergeExchange creates a MergingReciver with spooling TRUE, whereas the SingleMergeExchange creates one with spooling FALSE.  Although we don't test the spooling, I feel the new OrderedMuxExchange should probably have the same spooling setting as the HashToMergeExchange since both do the merge on local drill bits vs the foreman. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167587159
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,64 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
    +
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering> orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +    if (senders == null || senders.size() <= 0) {
    +      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    +    }
    +
    +    if (logger.isDebugEnabled()) {
    +      logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    --- End diff --
    
    Use smart SLF4J logging, remove `isDebugEnabled()` check, consider moving logging prior to the exception.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167625240
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,64 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
    +
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering> orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    --- End diff --
    
    Consider moving this functionality to the parent class and keep only creating an instance of concrete `MergingReceiver` in the subclass.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167091413
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,58 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, List<Ordering> orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +    if (senders == null || senders.size() <= 0) {
    --- End diff --
    
    Add debug level info for `receiverToSenderMapping` and minorFragmentId.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167090991
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,58 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, List<Ordering> orderings) {
    --- End diff --
    
    Json annotation for orderings?


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167953826
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java ---
    @@ -90,6 +92,24 @@ public Sender getSender(int minorFragmentId, PhysicalOperator child) {
         return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint());
       }
     
    +
    +  @Override
    +  public final Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +
    +    logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    --- End diff --
    
    Use SLF4J smart logging instead of `String.format`.


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    @vrozov  Thank you for reviewing the code. I have incorporated all the review comments. Please let me know if anything needs to be changed.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167616026
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedMuxExchange.java ---
    @@ -0,0 +1,64 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.config;
    +
    +import java.util.List;
    +
    +import org.apache.drill.exec.physical.MinorFragmentEndpoint;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.physical.base.Receiver;
    +import org.apache.drill.common.logical.data.Order.Ordering;
    +import com.fasterxml.jackson.annotation.JsonProperty;
    +import com.fasterxml.jackson.annotation.JsonTypeName;
    +
    +/**
    + * OrderedMuxExchange is a version of MuxExchange where the incoming batches are sorted
    + * merge operation is performed to produced a sorted stream as output.
    + */
    +@JsonTypeName("ordered-mux-exchange")
    +public class OrderedMuxExchange extends AbstractMuxExchange {
    +  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedMuxExchange.class);
    +
    +  private final List<Ordering> orderings;
    +
    +  public OrderedMuxExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings")List<Ordering> orderings) {
    +    super(child);
    +    this.orderings = orderings;
    +  }
    +
    +  @Override
    +  public Receiver getReceiver(int minorFragmentId) {
    +    createSenderReceiverMapping();
    +
    +    List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
    +    if (senders == null || senders.size() <= 0) {
    +      throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
    +    }
    +
    +    if (logger.isDebugEnabled()) {
    +      logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    +    }
    +
    +    return new MergingReceiverPOP(senderMajorFragmentId, senders, orderings, false);
    --- End diff --
    
    I don't think that locality plays a role in enabling/disabling spooling. If spooling was disabled for a remote receiver, it should be also disabled for a local receiver.


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    LGTM, please squash two last commits.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167447232
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java ---
    @@ -20,133 +20,34 @@
     import com.google.common.collect.Lists;
     
     import org.apache.drill.exec.planner.physical.ExchangePrel;
    -import org.apache.drill.exec.planner.physical.HashPrelUtil;
    -import org.apache.drill.exec.planner.physical.HashPrelUtil.HashExpressionCreatorHelper;
    -import org.apache.drill.exec.planner.physical.HashToRandomExchangePrel;
     import org.apache.drill.exec.planner.physical.PlannerSettings;
     import org.apache.drill.exec.planner.physical.Prel;
    -import org.apache.drill.exec.planner.physical.ProjectPrel;
    -import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
    -import org.apache.drill.exec.planner.physical.UnorderedDeMuxExchangePrel;
    -import org.apache.drill.exec.planner.physical.UnorderedMuxExchangePrel;
    -import org.apache.drill.exec.planner.sql.DrillSqlOperator;
     import org.apache.drill.exec.server.options.OptionManager;
     import org.apache.calcite.rel.RelNode;
    -import org.apache.calcite.rel.type.RelDataType;
    -import org.apache.calcite.rel.type.RelDataTypeField;
    -import org.apache.calcite.rex.RexBuilder;
    -import org.apache.calcite.rex.RexNode;
    -import org.apache.calcite.rex.RexUtil;
    -
    -import java.math.BigDecimal;
    -import java.util.Collections;
     import java.util.List;
     
     public class InsertLocalExchangeVisitor extends BasePrelVisitor<Prel, Void, RuntimeException> {
    -  private final boolean isMuxEnabled;
    -  private final boolean isDeMuxEnabled;
    -
    -
    -  public static class RexNodeBasedHashExpressionCreatorHelper implements HashExpressionCreatorHelper<RexNode> {
    -    private final RexBuilder rexBuilder;
    -
    -    public RexNodeBasedHashExpressionCreatorHelper(RexBuilder rexBuilder) {
    -      this.rexBuilder = rexBuilder;
    -    }
    -
    -    @Override
    -    public RexNode createCall(String funcName, List<RexNode> inputFields) {
    -      final DrillSqlOperator op =
    -          new DrillSqlOperator(funcName, inputFields.size(), true, false);
    -      return rexBuilder.makeCall(op, inputFields);
    -    }
    -  }
    +  private final OptionManager options;
     
       public static Prel insertLocalExchanges(Prel prel, OptionManager options) {
         boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
         boolean isDeMuxEnabled = options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val;
     
         if (isMuxEnabled || isDeMuxEnabled) {
    -      return prel.accept(new InsertLocalExchangeVisitor(isMuxEnabled, isDeMuxEnabled), null);
    +      return prel.accept(new InsertLocalExchangeVisitor(options), null);
    --- End diff --
    
    Since the local variables isMuxEnabled/disabled are not being used anymore, you can remove them on lines 33, 34. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167088606
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/PrelVisitor.java ---
    @@ -35,7 +38,9 @@
       public RETURN visitScan(ScanPrel prel, EXTRA value) throws EXCEP;
       public RETURN visitJoin(JoinPrel prel, EXTRA value) throws EXCEP;
       public RETURN visitProject(ProjectPrel prel, EXTRA value) throws EXCEP;
    -
    +  public RETURN visitHashToRandomExchange(HashToRandomExchangePrel prel, EXTRA value) throws EXCEP;
    --- End diff --
    
    Are 3 new methods necessary? Can `visitExchange` delegate to `prel` or use instance of? 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167586208
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java ---
    @@ -119,7 +119,7 @@ public final void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> rec
       }
     
       @Override
    -  public final <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
    +  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
    --- End diff --
    
    Is this change still required?


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167988964
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java ---
    @@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, PhysicalOperator child) {
         return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint());
       }
     
    -
    -  @Override
    -  public final Receiver getReceiver(int minorFragmentId) {
    +  protected final List<MinorFragmentEndpoint> getSenders(int minorFragmentId) {
         createSenderReceiverMapping();
     
         List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
     
    -    logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    +    logger.debug("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders);
    --- End diff --
    
    use {} in place of %d and %s.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by HanumathRao <gi...@git.apache.org>.
Github user HanumathRao commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r168021043
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java ---
    @@ -92,24 +92,19 @@ public Sender getSender(int minorFragmentId, PhysicalOperator child) {
         return new SingleSender(receiverMajorFragmentId, receiver.getId(), child, receiver.getEndpoint());
       }
     
    -
    -  @Override
    -  public final Receiver getReceiver(int minorFragmentId) {
    +  protected final List<MinorFragmentEndpoint> getSenders(int minorFragmentId) {
         createSenderReceiverMapping();
     
         List<MinorFragmentEndpoint> senders = receiverToSenderMapping.get(minorFragmentId);
     
    -    logger.debug(String.format("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders));
    +    logger.debug("Minor fragment %d, receives data from following senders %s", minorFragmentId, senders);
    --- End diff --
    
    Done. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167089878
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---
    @@ -55,10 +56,10 @@
     
     
       public RETURN visitExchange(Exchange exchange, EXTRA value) throws EXCEP;
    +  public RETURN visitSingleMergeExchange(SingleMergeExchange exchange, EXTRA value) throws EXCEP;
    --- End diff --
    
    The same question as for `PrelVisitor.java`. Is it necessary to have separate `visitSingleMergeExchange`?


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167448543
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOrderedMuxExchange.java ---
    @@ -0,0 +1,116 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.physical.impl;
    +
    +import org.apache.drill.PlanTestBase;
    +import org.apache.drill.common.expression.ExpressionPosition;
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.exec.ExecConstants;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.memory.BufferAllocator;
    +import org.apache.drill.exec.record.RecordBatchLoader;
    +import org.apache.drill.exec.rpc.user.QueryDataBatch;
    +import org.apache.drill.exec.vector.IntVector;
    +import org.apache.drill.test.ClusterFixture;
    +import org.apache.drill.test.ClusterFixtureBuilder;
    +import org.apache.drill.test.ClientFixture;
    +import org.junit.Test;
    +
    +import java.util.List;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +public class TestOrderedMuxExchange extends PlanTestBase {
    +
    +  private final static String ORDERED_MUX_EXCHANGE = "OrderedMuxExchange";
    +
    +
    +  private void validateResults(BufferAllocator allocator, List<QueryDataBatch> results) throws SchemaChangeException {
    +    long previousBigInt = Long.MIN_VALUE;
    +
    +    for (QueryDataBatch b : results) {
    +      RecordBatchLoader loader = new RecordBatchLoader(allocator);
    +      if (b.getHeader().getRowCount() > 0) {
    +        loader.load(b.getHeader().getDef(),b.getData());
    +        @SuppressWarnings({ "deprecation", "resource" })
    +        IntVector c1 = (IntVector) loader.getValueAccessorById(IntVector.class,
    +                   loader.getValueVectorId(new SchemaPath("id_i", ExpressionPosition.UNKNOWN)).getFieldIds()).getValueVector();
    +        IntVector.Accessor a1 = c1.getAccessor();
    +
    +        for (int i = 0; i < c1.getAccessor().getValueCount(); i++) {
    +          assertTrue(String.format("%d > %d", previousBigInt, a1.get(i)), previousBigInt <= a1.get(i));
    +          previousBigInt = a1.get(i);
    +        }
    +      }
    +      loader.clear();
    +      b.release();
    +    }
    +  }
    +
    +  /**
    +   * Test case to verify the OrderedMuxExchange created for order by clause.
    +   * It checks by forcing the plan to create OrderedMuxExchange and also verifies the
    +   * output column is ordered.
    +   *
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  @Test
    +  public void testOrderedMuxForOrderBy() throws Exception {
    +    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
    +            .maxParallelization(1)
    +            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
    +            ;
    +
    +    try (ClusterFixture cluster = builder.build();
    +         ClientFixture client = cluster.clientFixture()) {
    +      client.alterSession(ExecConstants.SLICE_TARGET, 10);
    +      String sql = "SELECT id_i, name_s10 FROM `mock`.`employees_10K` ORDER BY id_i limit 10";
    +
    +      String explainText = client.queryBuilder().sql(sql).explainText();
    +      assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
    +      validateResults(client.allocator(), client.queryBuilder().sql(sql).results());
    +    }
    +  }
    +
    +  /**
    +   * Test case to verify the OrderedMuxExchange created for window functions.
    +   * It checks by forcing the plan to create OrderedMuxExchange and also verifies the
    +   * output column is ordered.
    +   *
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  @Test
    +  public void testOrderedMuxForWindowAgg() throws Exception {
    +    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
    +            .maxParallelization(1)
    +            .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, true)
    +            ;
    +
    +    try (ClusterFixture cluster = builder.build();
    +         ClientFixture client = cluster.clientFixture()) {
    +      client.alterSession(ExecConstants.SLICE_TARGET, 10);
    +      String sql = "SELECT id_i, max(name_s10) over(order by id_i) FROM `mock`.`employees_10K` limit 10";
    +
    +      String explainText = client.queryBuilder().sql(sql).explainText();
    +      assertTrue(explainText.contains(ORDERED_MUX_EXCHANGE));
    +      validateResults(client.allocator(), client.queryBuilder().sql(sql).results());
    --- End diff --
    
    I think a simpler way to validate results would be to compare with a plan that does not have this mux exchange; that will allow arbitrary data types to be handled since currently you have a hardcoded IntVector comparison which limits the types of queries one can test. 


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167626024
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ExchangePrel.java ---
    @@ -34,4 +37,14 @@ public ExchangePrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
         return logicalVisitor.visitExchange(this, value);
       }
     
    +  /**
    +   * The derived classes can override this method to create relevant mux exchanges.
    +   * If this method is not overrided the default behaviour is to clone itself.
    +   * @param child input to the new muxPrel or new Exchange node.
    +   * @param options options manager to check if mux is enabled.
    +   */
    +  public Prel getMuxPrel(Prel child, OptionManager options) {
    --- End diff --
    
    Consider renaming to `constructMuxPrel`.


---

[GitHub] drill pull request #1110: DRILL-6115: SingleMergeExchange is not scaling up ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1110#discussion_r167589289
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashToRandomExchangePrel.java ---
    @@ -112,6 +120,73 @@ public RelWriter explainTerms(RelWriter pw) {
         return pw;
       }
     
    +  /**
    +   * This method creates a new UnorderedMux and Demux exchanges if mux operators are enabled.
    +   * @param child input to the new Unordered[Mux/Demux]Prel or new HashToRandomExchange node.
    +   * @param options options manager to check if mux is enabled.
    +   */
    +  @Override
    +  public Prel getMuxPrel(Prel child, OptionManager options) {
    +    boolean isMuxEnabled = options.getOption(PlannerSettings.MUX_EXCHANGE.getOptionName()).bool_val;
    +    Prel newPrel = child;
    +
    +    final List<String> childFields = child.getRowType().getFieldNames();
    +
    +    List <RexNode> removeUpdatedExpr = null;
    +
    +    if (isMuxEnabled) {
    +      // Insert Project Operator with new column that will be a hash for HashToRandomExchange fields
    +      final List<DistributionField> distFields = getFields();
    +      final List<String> outputFieldNames = Lists.newArrayList(childFields);
    +      final RexBuilder rexBuilder = getCluster().getRexBuilder();
    +      final List<RelDataTypeField> childRowTypeFields = child.getRowType().getFieldList();
    +
    +      final HashPrelUtil.HashExpressionCreatorHelper<RexNode> hashHelper = new HashPrelUtil.RexNodeBasedHashExpressionCreatorHelper(rexBuilder);
    +      final List<RexNode> distFieldRefs = Lists.newArrayListWithExpectedSize(distFields.size());
    +      for(int i=0; i<distFields.size(); i++) {
    +        final int fieldId = distFields.get(i).getFieldId();
    +        distFieldRefs.add(rexBuilder.makeInputRef(childRowTypeFields.get(fieldId).getType(), fieldId));
    +      }
    +
    +      final List <RexNode> updatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
    +      removeUpdatedExpr = Lists.newArrayListWithExpectedSize(childRowTypeFields.size());
    +      for ( RelDataTypeField field : childRowTypeFields) {
    +        RexNode rex = rexBuilder.makeInputRef(field.getType(), field.getIndex());
    +        updatedExpr.add(rex);
    +        removeUpdatedExpr.add(rex);
    +      }
    +
    +      outputFieldNames.add(HashPrelUtil.HASH_EXPR_NAME);
    +      final RexNode distSeed = rexBuilder.makeBigintLiteral(BigDecimal.valueOf(HashPrelUtil.DIST_SEED)); // distribution seed
    +      updatedExpr.add(HashPrelUtil.createHashBasedPartitionExpression(distFieldRefs, distSeed, hashHelper));
    +
    +      RelDataType rowType = RexUtil.createStructType(getCluster().getTypeFactory(), updatedExpr, outputFieldNames);
    +
    +      ProjectPrel addColumnprojectPrel = new ProjectPrel(child.getCluster(), child.getTraitSet(), child, updatedExpr, rowType);
    +
    +      newPrel = new UnorderedMuxExchangePrel(addColumnprojectPrel.getCluster(), addColumnprojectPrel.getTraitSet(),
    +              addColumnprojectPrel);
    +    }
    +
    +    newPrel = new HashToRandomExchangePrel(getCluster(), getTraitSet(), newPrel, getFields());
    +
    +    if (options.getOption(PlannerSettings.DEMUX_EXCHANGE.getOptionName()).bool_val) {
    +      HashToRandomExchangePrel hashExchangePrel = (HashToRandomExchangePrel) newPrel;
    +      // Insert a DeMuxExchange to narrow down the number of receivers
    +      newPrel = new UnorderedDeMuxExchangePrel(getCluster(), getTraitSet(), hashExchangePrel,
    +              hashExchangePrel.getFields());
    +    }
    +
    +    if ( isMuxEnabled ) {
    --- End diff --
    
    Use consistent formating.


---

[GitHub] drill issue #1110: DRILL-6115: SingleMergeExchange is not scaling up when ma...

Posted by amansinha100 <gi...@git.apache.org>.
Github user amansinha100 commented on the issue:

    https://github.com/apache/drill/pull/1110
  
    @HanumathRao I have a few comments in the JIRA for the overall design; we can discuss. 


---