You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tajo.apache.org by jihoonson <gi...@git.apache.org> on 2015/05/25 14:26:55 UTC

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

GitHub user jihoonson opened a pull request:

    https://github.com/apache/tajo/pull/583

    TAJO-1553: Improve broadcast join planning

    Sorry for the large patch, but most changes are related to unit tests.
    
    In this patch, I've added ```BroadcastJoinRule``` as a new ```GlobalPlanRewriteRule```.
    BroadcastJoinRule converts repartition join plan into broadcast join plan. To describe the broadcast join rules, we have to define the ```broadcastable``` property for a relation as follows.
    
    _Broadcastable relation:_ A relation is broadcastable when its size is smaller than a given threshold.
    
    And I've assumed that if every input of an execution block is broadcastable, the output of the execution block is also broadcastable.
    
    Finally, here are the rules to convert repartition join into broadcast join.
    
    * Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.
    * Given a user-defined threshold, the total size of broadcast relations of an EB cannot exceed such threshold.
     * After merging EBs according to the first rule, the result EB may not satisfy the second rule. In this case, enforce repartition join for large relations to satisfy the second rule.
    * Preserved-row relations cannot be broadcasted to avoid duplicated results. That is, full outer join cannot be executed with broadcast join.
     * Here is brief backgrounds for this rule. Data of preserved-row relations will be appeared in the join result regardless of join conditions. If multiple tasks execute outer join with broadcasted preserved-row relations, they emit duplicates results.
     * Even though a single task can execute outer join when every input is broadcastable, broadcast join is not allowed if one of input relation consists of multiple files.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jihoonson/tajo-2 TAJO-1553

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/tajo/pull/583.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #583
    
----
commit 7d72e8bef78abbeddb96dda20cd93937c0983f4a
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-16T09:47:59Z

    TAJO-1553

commit 62f8ec79508cbc34273546d0e4103cd2d36348d4
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-16T12:21:33Z

    TAJO-1553

commit a47a6025b753e2c2e4c8eb5b486657fd2b2c8d2f
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-17T06:22:51Z

    TAJO-1553

commit 1dfee64762eacd2c2ed1f1e82fd91e6b721a2a91
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-20T05:09:54Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit 5feaeac20bc38ba3846f0da2e8267c2c164c0ada
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-20T05:59:55Z

    TAJO-1553

commit 81c1318ae4e6f0d440c1d57403aa5f0730a9c434
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-20T07:19:27Z

    TAJO-1553

commit 78c7222c523c76106a76c50565509ab056d2867e
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-27T04:40:49Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit 1d97567a382308c5534284a8a796257818c0d79f
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-27T09:30:43Z

    Merge branch 'TAJO-1553' of https://github.com/jihoonson/tajo-2 into TAJO-1553

commit 5f43b4e0b4a92d79ce0911d0309f9710d75815dd
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-27T10:51:17Z

    TAJO-1553

commit 70905ddc28473f8049aa845de1fdd275d223cce4
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-27T13:51:08Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit ae4bc8ec4c05f150760e3dbfc975a2f4a2dec9c8
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-28T08:58:25Z

    TAJO-1553

commit b4d3d2f64475197a85dd285a488aa37572e4b293
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-28T10:18:20Z

    TAJO-1553

commit 722c62e566c4127948c507b9c5ba82b5f2e15fac
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-29T02:22:44Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553
    
    Conflicts:
    	tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java

commit 068b0ee150f1284600980bf6702760b00b45f252
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-29T02:46:09Z

    TAJO-1553

commit 7938077be9d955faee0f6b3ccfdc64777d9b37f1
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-29T10:06:53Z

    Fix aggregation problem

commit 21df329b03bab523ba448acf18515c9d4a5e591a
Author: Jihoon Son <ji...@apache.org>
Date:   2015-04-30T10:43:19Z

    TAJO-1553

commit ebf12b5c2a78c75c068edd64eee6ffa093889a15
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-04T03:42:18Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit 06d725c222f5ad210c2a822b009bf22fcd434dd3
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-04T13:23:41Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit 0af98050a7e2df70af92b6cf91e805cb1b7ce663
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-04T13:23:51Z

    TAJO-1553

commit bb72132627824c5e9a54de537f6808d94bf5635e
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-04T13:25:46Z

    Merge branch 'TAJO-1553' of https://github.com/jihoonson/tajo-2 into TAJO-1553

commit b22fe962b7ed82ba0606f8a7236d68906406cd8b
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-05T04:07:30Z

    TAJO-1553

commit 4cb67c8ff15256ad0b482dbd7ab8d30858006228
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-05T13:15:20Z

    Fix distinct aggregation bug

commit edf427abffd14e30b1ae0d87496258df30d9f798
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-06T01:58:01Z

    Merge branch 'TAJO-1553' of https://github.com/jihoonson/tajo-2 into TAJO-1553

commit 7598a737c0da5111200290992c8660b753fd27cc
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-06T09:09:42Z

    TAJO-1553

commit 1498534b48c88d8960583d9d185e76a631b882b4
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-06T09:50:21Z

    TAJO-1553

commit 8879f8c20ea9b8cfe1b0b1b187632740a7de4285
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-07T14:04:12Z

    TAJO-1553

commit f557ac4d8111f1fda2937b72bf2ebedaf9537d0b
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-08T03:56:11Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit 5fce06404c2bb0e90303c4833f6cff755e76e6eb
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-08T03:57:25Z

    Merge branch 'TAJO-1553' of https://github.com/jihoonson/tajo-2 into TAJO-1553

commit b2ff12bee38524a0dd47171719b4a9855d09a895
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-08T03:57:32Z

    Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into TAJO-1553

commit a6d322ca1307e83e33742fb8600b70b952f50cf7
Author: Jihoon Son <ji...@apache.org>
Date:   2015-05-08T06:03:13Z

    TAJO-1553

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on the pull request:

    https://github.com/apache/tajo/pull/583#issuecomment-105847820
  
    +1 The patch looks good to me. The result looks significant!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31094539
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---
    @@ -0,0 +1,341 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.engine.planner.global.rewriter.rules;
    +
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.OverridableConf;
    +import org.apache.tajo.SessionVars;
    +import org.apache.tajo.engine.planner.global.ExecutionBlock;
    +import org.apache.tajo.engine.planner.global.GlobalPlanner;
    +import org.apache.tajo.engine.planner.global.MasterPlan;
    +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
    +import org.apache.tajo.plan.LogicalPlan;
    +import org.apache.tajo.plan.PlanningException;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.plan.util.PlannerUtil;
    +import org.apache.tajo.util.TUtil;
    +import org.apache.tajo.util.graph.DirectedGraphVisitor;
    +
    +import java.util.*;
    +
    +/**
    + * {@link BroadcastJoinRule} converts repartition join plan into broadcast join plan.
    + * Broadcast join rules can be defined as follows.
    + *
    + * <h3>Broadcastable relation</h3>
    + * A relation is broadcastable when its size is smaller than a given threshold.
    + *
    + * <h3>Assumetion</h3>
    + * If every input of an execution block is broadcastable, the output of the execution block is also broadcastable.
    + *
    + * <h3>Rules to convert repartition join into broadcast join</h3>
    + * <ul>
    + *   <li>Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.</li>
    + *   <li>Given a user-defined threshold, the total size of broadcast relations of an EB cannot exceed such threshold.</li>
    + *   <ul>
    + *     <li>After merging EBs according to the first rule, the result EB may not satisfy the second rule. In this case, enforce repartition join for large relations to satisfy the second rule.</li>
    + *   </ul>
    + *   <li>Preserved-row relations cannot be broadcasted to avoid duplicated results. That is, full outer join cannot be executed with broadcast join.</li>
    + *   <ul>
    + *     <li>Here is brief backgrounds for this rule. Data of preserved-row relations will be appeared in the join result regardless of join conditions. If multiple tasks execute outer join with broadcasted preserved-row relations, they emit duplicates results.</li>
    + *     <li>Even though a single task can execute outer join when every input is broadcastable, broadcast join is not allowed if one of input relation consists of multiple files.</li>
    + *   </ul>
    + * </ul>
    + *
    + */
    +public class BroadcastJoinRule implements GlobalPlanRewriteRule {
    +
    +  private BroadcastJoinPlanBuilder planBuilder;
    +  private BroadcastJoinPlanFinalizer planFinalizer;
    +
    +  @Override
    +  public String getName() {
    +    return "BroadcastJoinRule";
    +  }
    +
    +  @Override
    +  public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
    +    if (queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED)) {
    +      for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) {
    +        if (block.hasNode(NodeType.JOIN)) {
    +          long broadcastSizeThreshold = queryContext.getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
    +          if (broadcastSizeThreshold > 0) {
    +            GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder();
    +            RelationSizeComparator relSizeComparator = new RelationSizeComparator();
    +            planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, broadcastSizeThreshold);
    +            planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator);
    +            return true;
    +          }
    +        }
    +      }
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public MasterPlan rewrite(MasterPlan plan) throws PlanningException{
    +    plan.accept(plan.getRoot().getId(), planBuilder);
    +    plan.accept(plan.getRoot().getId(), planFinalizer);
    +    return plan;
    +  }
    +
    +  private static class RelationSizeComparator implements Comparator<ScanNode> {
    +
    +    @Override
    +    public int compare(ScanNode o1, ScanNode o2) {
    +      return (int) (GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2));
    --- End diff --
    
    It seems to cause overflow when the difference of two table volumes are more than the range of integer value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by jihoonson <gi...@git.apache.org>.
Github user jihoonson commented on the pull request:

    https://github.com/apache/tajo/pull/583#issuecomment-105843248
  
    I focused on improving the broadcast join planning in this patch. I evaluated performance with an example query which Tajo cannot fully exploit the broadcast join opportunity. 
    ### Query
    ```
    default> select r_name, case when l_shipmode is null then 'N/O' else l_shipmode end as s1 from region inner join ( select * from lineitem left outer join nation on l_suppkey = n_nationkey ) t on n_regionkey = r_regionkey
    ```
    ### Data
    * Input: TPC-H scale factor of 100
    * Result: 14484 rows (185.7 KiB)
    
    ### Performance
    * Before patch: 226.915 sec
    * After patch: 49.647 sec
    
    ### Query plan
    ##### Before patch
    ```
    -------------------------------------------------------------------------------
    Execution Block Graph (TERMINAL - eb_1432720176114_0001_000006)
    -------------------------------------------------------------------------------
    |-eb_1432720176114_0001_000006
       |-eb_1432720176114_0001_000005
          |-eb_1432720176114_0001_000004
          |-eb_1432720176114_0001_000001
    -------------------------------------------------------------------------------
    Order of Execution
    -------------------------------------------------------------------------------
    1: eb_1432720176114_0001_000001
    2: eb_1432720176114_0001_000004
    3: eb_1432720176114_0001_000005
    4: eb_1432720176114_0001_000006
    -------------------------------------------------------------------------------
    
    =======================================================
    Block Id: eb_1432720176114_0001_000001 [LEAF]
    =======================================================
    
    [Outgoing]
    [q_1432720176114_0001] 1 => 5 (type=HASH_SHUFFLE, key=tpch100.region.r_regionkey (INT8), num=1186)
    
    SCAN(0) on tpch100.region
      => target list: tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)
      => out schema: {(2) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)}
      => in schema: {(3) tpch100.region.r_regionkey (INT8), tpch100.region.r_name (TEXT), tpch100.region.r_comment (TEXT)}
    
    =======================================================
    Block Id: eb_1432720176114_0001_000004 [LEAF]
    =======================================================
    
    [Outgoing]
    [q_1432720176114_0001] 4 => 5 (type=HASH_SHUFFLE, key=tpch100.t.n_regionkey (INT8), num=1186)
    
    [Enforcers]
     0: type=Broadcast, tables=tpch100.nation
    
    TABLE_SUBQUERY(5) as tpch100.t
      => Targets: CASE WHEN tpch100.t.l_shipmode (TEXT) IS NULL THEN N/O ELSE tpch100.t.l_shipmode (TEXT) END as s1, tpch100.t.n_regionkey (INT8)
      => out schema: {(2) s1 (TEXT), tpch100.t.n_regionkey (INT8)}
      => in  schema: {(20) tpch100.t.l_orderkey (INT8), tpch100.t.l_partkey (INT8), tpch100.t.l_suppkey (INT8), tpch100.t.l_linenumber (INT8), tpch100.t.l_quantity (FLOAT8), tpch100.t.l_extendedprice (FLOAT8), tpch100.t.l_discount (FLOAT8), tpch100.t.l_tax (FLOAT8), tpch100.t.l_returnflag (TEXT), tpch100.t.l_linestatus (TEXT), tpch100.t.l_shipdate (TEXT), tpch100.t.l_commitdate (TEXT), tpch100.t.l_receiptdate (TEXT), tpch100.t.l_shipinstruct (TEXT), tpch100.t.l_shipmode (TEXT), tpch100.t.l_comment (TEXT), tpch100.t.n_nationkey (INT8), tpch100.t.n_name (TEXT), tpch100.t.n_regionkey (INT8), tpch100.t.n_comment (TEXT)}
       PROJECTION(4)
         => Targets: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
         => out schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
         => in  schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
          JOIN(9)(LEFT_OUTER)
            => Join Cond: tpch100.lineitem.l_suppkey (INT8) = tpch100.nation.n_nationkey (INT8)
            => target list: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
            => out schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
            => in schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
             SCAN(2) on tpch100.nation
               => target list: tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
               => out schema: {(4) tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
               => in schema: {(4) tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
             SCAN(1) on tpch100.lineitem
               => target list: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)
               => out schema: {(16) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)}
               => in schema: {(16) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)}
    
    =======================================================
    Block Id: eb_1432720176114_0001_000005 [ROOT]
    =======================================================
    
    [Incoming]
    [q_1432720176114_0001] 1 => 5 (type=HASH_SHUFFLE, key=tpch100.region.r_regionkey (INT8), num=1186)
    [q_1432720176114_0001] 4 => 5 (type=HASH_SHUFFLE, key=tpch100.t.n_regionkey (INT8), num=1186)
    
    JOIN(10)(INNER)
      => Join Cond: tpch100.t.n_regionkey (INT8) = tpch100.region.r_regionkey (INT8)
      => target list: tpch100.region.r_name (TEXT), s1 (TEXT)
      => out schema: {(2) tpch100.region.r_name (TEXT), s1 (TEXT)}
      => in schema: {(4) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8), s1 (TEXT), tpch100.t.n_regionkey (INT8)}
       SCAN(13) on eb_1432720176114_0001_000004
         => out schema: {(2) s1 (TEXT), tpch100.t.n_regionkey (INT8)}
         => in schema: {(2) s1 (TEXT), tpch100.t.n_regionkey (INT8)}
       SCAN(12) on eb_1432720176114_0001_000001
         => out schema: {(2) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)}
         => in schema: {(2) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)}
    
    =======================================================
    Block Id: eb_1432720176114_0001_000006 [TERMINAL]
    =======================================================
    ```
    ##### After patch
    ```
    -------------------------------------------------------------------------------
    Execution Block Graph (TERMINAL - eb_1432719753330_0002_000006)
    -------------------------------------------------------------------------------
    |-eb_1432719753330_0002_000006
       |-eb_1432719753330_0002_000005
    -------------------------------------------------------------------------------
    Order of Execution
    -------------------------------------------------------------------------------
    1: eb_1432719753330_0002_000005
    2: eb_1432719753330_0002_000006
    -------------------------------------------------------------------------------
    
    =======================================================
    Block Id: eb_1432719753330_0002_000005 [ROOT]
    =======================================================
    
    [Enforcers]
     0: type=Broadcast, tables=tpch100.region
     1: type=Broadcast, tables=tpch100.nation
    
    JOIN(10)(INNER)
      => Join Cond: tpch100.t.n_regionkey (INT8) = tpch100.region.r_regionkey (INT8)
      => target list: tpch100.region.r_name (TEXT), s1 (TEXT)
      => out schema: {(2) tpch100.region.r_name (TEXT), s1 (TEXT)}
      => in schema: {(4) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8), s1 (TEXT), tpch100.t.n_regionkey (INT8)}
       TABLE_SUBQUERY(5) as tpch100.t
         => Targets: CASE WHEN tpch100.t.l_shipmode (TEXT) IS NULL THEN N/O ELSE tpch100.t.l_shipmode (TEXT) END as s1, tpch100.t.n_regionkey (INT8)
         => out schema: {(2) s1 (TEXT), tpch100.t.n_regionkey (INT8)}
         => in  schema: {(20) tpch100.t.l_orderkey (INT8), tpch100.t.l_partkey (INT8), tpch100.t.l_suppkey (INT8), tpch100.t.l_linenumber (INT8), tpch100.t.l_quantity (FLOAT8), tpch100.t.l_extendedprice (FLOAT8), tpch100.t.l_discount (FLOAT8), tpch100.t.l_tax (FLOAT8), tpch100.t.l_returnflag (TEXT), tpch100.t.l_linestatus (TEXT), tpch100.t.l_shipdate (TEXT), tpch100.t.l_commitdate (TEXT), tpch100.t.l_receiptdate (TEXT), tpch100.t.l_shipinstruct (TEXT), tpch100.t.l_shipmode (TEXT), tpch100.t.l_comment (TEXT), tpch100.t.n_nationkey (INT8), tpch100.t.n_name (TEXT), tpch100.t.n_regionkey (INT8), tpch100.t.n_comment (TEXT)}
          PROJECTION(4)
            => Targets: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
            => out schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
            => in  schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
             JOIN(9)(LEFT_OUTER)
               => Join Cond: tpch100.lineitem.l_suppkey (INT8) = tpch100.nation.n_nationkey (INT8)
               => target list: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
               => out schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
               => in schema: {(20) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT), tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
                SCAN(2) on tpch100.nation
                  => target list: tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)
                  => out schema: {(4) tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
                  => in schema: {(4) tpch100.nation.n_nationkey (INT8), tpch100.nation.n_name (TEXT), tpch100.nation.n_regionkey (INT8), tpch100.nation.n_comment (TEXT)}
                SCAN(1) on tpch100.lineitem
                  => target list: tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)
                  => out schema: {(16) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)}
                  => in schema: {(16) tpch100.lineitem.l_orderkey (INT8), tpch100.lineitem.l_partkey (INT8), tpch100.lineitem.l_suppkey (INT8), tpch100.lineitem.l_linenumber (INT8), tpch100.lineitem.l_quantity (FLOAT8), tpch100.lineitem.l_extendedprice (FLOAT8), tpch100.lineitem.l_discount (FLOAT8), tpch100.lineitem.l_tax (FLOAT8), tpch100.lineitem.l_returnflag (TEXT), tpch100.lineitem.l_linestatus (TEXT), tpch100.lineitem.l_shipdate (TEXT), tpch100.lineitem.l_commitdate (TEXT), tpch100.lineitem.l_receiptdate (TEXT), tpch100.lineitem.l_shipinstruct (TEXT), tpch100.lineitem.l_shipmode (TEXT), tpch100.lineitem.l_comment (TEXT)}
       SCAN(0) on tpch100.region
         => target list: tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)
         => out schema: {(2) tpch100.region.r_name (TEXT), tpch100.region.r_regionkey (INT8)}
         => in schema: {(3) tpch100.region.r_regionkey (INT8), tpch100.region.r_name (TEXT), tpch100.region.r_comment (TEXT)}
    
    =======================================================
    Block Id: eb_1432719753330_0002_000006 [TERMINAL]
    =======================================================
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by jihoonson <gi...@git.apache.org>.
Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31097759
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/BroadcastJoinRule.java ---
    @@ -0,0 +1,341 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.tajo.engine.planner.global.rewriter.rules;
    +
    +import org.apache.tajo.ExecutionBlockId;
    +import org.apache.tajo.OverridableConf;
    +import org.apache.tajo.SessionVars;
    +import org.apache.tajo.engine.planner.global.ExecutionBlock;
    +import org.apache.tajo.engine.planner.global.GlobalPlanner;
    +import org.apache.tajo.engine.planner.global.MasterPlan;
    +import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
    +import org.apache.tajo.plan.LogicalPlan;
    +import org.apache.tajo.plan.PlanningException;
    +import org.apache.tajo.plan.logical.*;
    +import org.apache.tajo.plan.util.PlannerUtil;
    +import org.apache.tajo.util.TUtil;
    +import org.apache.tajo.util.graph.DirectedGraphVisitor;
    +
    +import java.util.*;
    +
    +/**
    + * {@link BroadcastJoinRule} converts repartition join plan into broadcast join plan.
    + * Broadcast join rules can be defined as follows.
    + *
    + * <h3>Broadcastable relation</h3>
    + * A relation is broadcastable when its size is smaller than a given threshold.
    + *
    + * <h3>Assumetion</h3>
    + * If every input of an execution block is broadcastable, the output of the execution block is also broadcastable.
    + *
    + * <h3>Rules to convert repartition join into broadcast join</h3>
    + * <ul>
    + *   <li>Given an EB containing a join and its child EBs, those EBs can be merged into a single EB if at least one child EB's output is broadcastable.</li>
    + *   <li>Given a user-defined threshold, the total size of broadcast relations of an EB cannot exceed such threshold.</li>
    + *   <ul>
    + *     <li>After merging EBs according to the first rule, the result EB may not satisfy the second rule. In this case, enforce repartition join for large relations to satisfy the second rule.</li>
    + *   </ul>
    + *   <li>Preserved-row relations cannot be broadcasted to avoid duplicated results. That is, full outer join cannot be executed with broadcast join.</li>
    + *   <ul>
    + *     <li>Here is brief backgrounds for this rule. Data of preserved-row relations will be appeared in the join result regardless of join conditions. If multiple tasks execute outer join with broadcasted preserved-row relations, they emit duplicates results.</li>
    + *     <li>Even though a single task can execute outer join when every input is broadcastable, broadcast join is not allowed if one of input relation consists of multiple files.</li>
    + *   </ul>
    + * </ul>
    + *
    + */
    +public class BroadcastJoinRule implements GlobalPlanRewriteRule {
    +
    +  private BroadcastJoinPlanBuilder planBuilder;
    +  private BroadcastJoinPlanFinalizer planFinalizer;
    +
    +  @Override
    +  public String getName() {
    +    return "BroadcastJoinRule";
    +  }
    +
    +  @Override
    +  public boolean isEligible(OverridableConf queryContext, MasterPlan plan) {
    +    if (queryContext.getBool(SessionVars.TEST_BROADCAST_JOIN_ENABLED)) {
    +      for (LogicalPlan.QueryBlock block : plan.getLogicalPlan().getQueryBlocks()) {
    +        if (block.hasNode(NodeType.JOIN)) {
    +          long broadcastSizeThreshold = queryContext.getLong(SessionVars.BROADCAST_TABLE_SIZE_LIMIT);
    +          if (broadcastSizeThreshold > 0) {
    +            GlobalPlanRewriteUtil.ParentFinder parentFinder = new GlobalPlanRewriteUtil.ParentFinder();
    +            RelationSizeComparator relSizeComparator = new RelationSizeComparator();
    +            planBuilder = new BroadcastJoinPlanBuilder(plan, relSizeComparator, parentFinder, broadcastSizeThreshold);
    +            planFinalizer = new BroadcastJoinPlanFinalizer(plan, relSizeComparator);
    +            return true;
    +          }
    +        }
    +      }
    +    }
    +    return false;
    +  }
    +
    +  @Override
    +  public MasterPlan rewrite(MasterPlan plan) throws PlanningException{
    +    plan.accept(plan.getRoot().getId(), planBuilder);
    +    plan.accept(plan.getRoot().getId(), planFinalizer);
    +    return plan;
    +  }
    +
    +  private static class RelationSizeComparator implements Comparator<ScanNode> {
    +
    +    @Override
    +    public int compare(ScanNode o1, ScanNode o2) {
    +      return (int) (GlobalPlanRewriteUtil.getTableVolume(o1) - GlobalPlanRewriteUtil.getTableVolume(o2));
    --- End diff --
    
    Improved to consider overflow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31093369
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---
    @@ -120,25 +171,49 @@ public boolean hasUnion() {
         return hasUnionPlan;
       }
     
    -  public void addBroadcastTable(String tableName) {
    -    broadcasted.add(tableName);
    -    enforcer.addBroadcast(tableName);
    +  public boolean isUnionOnly() {
    +    return isUnionOnly;
       }
     
    -  public void removeBroadcastTable(String tableName) {
    -    broadcasted.remove(tableName);
    -    enforcer.removeBroadcast(tableName);
    +  public void addBroadcastRelation(ScanNode relationNode) {
    --- End diff --
    
    any reason to change the suffix from table to relation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by jihoonson <gi...@git.apache.org>.
Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31097737
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---
    @@ -86,6 +86,23 @@ public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
         TUtil.putToNestedList(properties, builder.getType(), builder.build());
       }
     
    +  public void removeSortedInput(String tableName) {
    --- End diff --
    
    Removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by jihoonson <gi...@git.apache.org>.
Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31094037
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---
    @@ -39,8 +40,38 @@
     
       private boolean hasJoinPlan;
       private boolean hasUnionPlan;
    -
    -  private Set<String> broadcasted = new HashSet<String>();
    +  private boolean isUnionOnly;
    +
    +  private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap();
    +
    +  /*
    +   * An execution block is null-supplying or preserved-row when its output is used as an input for outer join.
    +   * These flags are set according to the type of outer join.
    +   * Here are brief descriptions for these flags.
    +   *
    +   * 1) left outer join
    +   *
    +   *        left outer join
    +   *          /        \
    +   * preserved-row  null-supplying
    +   *
    +   * 2) right outer join
    +   *
    +   *        right outer join
    +   *          /        \
    +   * null-supplying  preserved-row
    +   *
    +   * 3) full outer join
    +   *
    +   *        full outer join
    +   *          /        \
    +   * null-supplying  preserved-row
    +   * preserved-row   null-supplying
    +   *
    +   * The null-supplying and preserved-row flags are used to find which relations will be broadcasted.
    +   */
    +  protected transient boolean nullSuppllying = false;
    --- End diff --
    
    Thanks. I'll remove. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31094300
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java ---
    @@ -86,6 +86,23 @@ public void addSortedInput(String tableName, SortSpec[] sortSpecs) {
         TUtil.putToNestedList(properties, builder.getType(), builder.build());
       }
     
    +  public void removeSortedInput(String tableName) {
    --- End diff --
    
    It seems to be not used in this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by jihoonson <gi...@git.apache.org>.
Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31094151
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---
    @@ -120,25 +171,49 @@ public boolean hasUnion() {
         return hasUnionPlan;
       }
     
    -  public void addBroadcastTable(String tableName) {
    -    broadcasted.add(tableName);
    -    enforcer.addBroadcast(tableName);
    +  public boolean isUnionOnly() {
    +    return isUnionOnly;
       }
     
    -  public void removeBroadcastTable(String tableName) {
    -    broadcasted.remove(tableName);
    -    enforcer.removeBroadcast(tableName);
    +  public void addBroadcastRelation(ScanNode relationNode) {
    --- End diff --
    
    I used the term 'broadcastable'. If this term is used with 'table' together, i.e., broadcastableTable, it makes readers confused. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/tajo/pull/583


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] tajo pull request: TAJO-1553: Improve broadcast join planning

Posted by hyunsik <gi...@git.apache.org>.
Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/583#discussion_r31093213
  
    --- Diff: tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---
    @@ -39,8 +40,38 @@
     
       private boolean hasJoinPlan;
       private boolean hasUnionPlan;
    -
    -  private Set<String> broadcasted = new HashSet<String>();
    +  private boolean isUnionOnly;
    +
    +  private Map<String, ScanNode> broadcastRelations = TUtil.newHashMap();
    +
    +  /*
    +   * An execution block is null-supplying or preserved-row when its output is used as an input for outer join.
    +   * These flags are set according to the type of outer join.
    +   * Here are brief descriptions for these flags.
    +   *
    +   * 1) left outer join
    +   *
    +   *        left outer join
    +   *          /        \
    +   * preserved-row  null-supplying
    +   *
    +   * 2) right outer join
    +   *
    +   *        right outer join
    +   *          /        \
    +   * null-supplying  preserved-row
    +   *
    +   * 3) full outer join
    +   *
    +   *        full outer join
    +   *          /        \
    +   * null-supplying  preserved-row
    +   * preserved-row   null-supplying
    +   *
    +   * The null-supplying and preserved-row flags are used to find which relations will be broadcasted.
    +   */
    +  protected transient boolean nullSuppllying = false;
    --- End diff --
    
    'transient' seems to be not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---