You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Rajesh Balamohan <ra...@gmail.com> on 2016/04/12 14:32:02 UTC

SparkSQL - Limit pushdown on BroadcastHashJoin

Hi,

I ran the following query in spark (latest master codebase) and it took a
lot of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete
join condition.  Shouldn't the limit condition be pushed to
BroadcastHashJoin (wherein it would have to stop processing after
generating 10 rows?).  Please let me know if my understanding on this is
wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
BuildRight, None
   :        :- Project [l_partkey#893]
   :        :  +- Filter isnotnull(l_partkey#893)
   :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
   :        +- INPUT
   +- BroadcastExchange
HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
bigint)),List(ps_partkey#908))
      +- WholeStageCodegen
         :  +- Project [ps_partkey#908]
         :     +- Filter isnotnull(ps_partkey#908)
         :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
 |
>>>>




-- 
~Rajesh.B

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Zhan Zhang <zz...@hortonworks.com>.
Thanks Reynold.

Not sure why doExecute is not invoked, since CollectLimit does not support wholeStage

case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {

I will dig further into this.

Zhan Zhang

On Apr 18, 2016, at 10:36 PM, Reynold Xin <rx...@databricks.com>> wrote:

Anyway we can verify this easily. I just added a println to each row and verified that only limit + 1 row was printed after the join and before the limit.

It'd be great if you do some debugging yourself and see if it is going through some other code path.


On Mon, Apr 18, 2016 at 10:35 PM, Reynold Xin <rx...@databricks.com>> wrote:
But doExecute is not called?

On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zz...@hortonworks.com>> wrote:
Hi Reynold,

I just check the code for CollectLimit, there is a shuffle happening to collect them in one partition.


protected override def doExecute(): RDD[InternalRow] = {
  val shuffled = new ShuffledRowRDD(
    ShuffleExchange.prepareShuffleDependency(
      child.execute(), child.output, SinglePartition, serializer))
  shuffled.mapPartitionsInternal(_.take(limit))
}

Thus, there is no way to avoid processing all data before the shuffle. I think that is the reason. Do I understand correctly?

Thanks.

Zhan Zhang
On Apr 18, 2016, at 10:08 PM, Reynold Xin <rx...@databricks.com>> wrote:

Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */     /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */
/* 098 */
/* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */     // initialize Range
/* 102 */     if (!range_initRange) {
/* 103 */       range_initRange = true;
/* 104 */       initRange(partitionIndex);
/* 105 */     }
/* 106 */
/* 107 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */       long range_value = range_number;
/* 109 */       range_number += 1L;
/* 110 */       if (range_number < range_value ^ 1L < 0) {
/* 111 */         range_overflow = true;
/* 112 */       }
/* 113 */
/* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */
/* 115 */
/* 116 */       // generate join key for stream side
/* 117 */
/* 118 */       // find matches from HashedRelation
/* 119 */       UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */       if (bhj_matched == null) continue;
/* 121 */
/* 122 */       bhj_metricValue.add(1);
/* 123 */
/* 124 */       /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */       System.out.println("i got one row");
/* 127 */
/* 128 */       /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */       project_rowWriter.write(0, range_value);
/* 131 */       append(project_result);
/* 132 */
/* 133 */       if (shouldStop()) return;
/* 134 */     }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zz...@hortonworks.com>> wrote:
>From the physical plan, the limit is one level up than the WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move it work, the limit has to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop).

It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hv...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>>:
Hi,

I ran the following query in spark (latest master codebase) and it took a lot of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete join condition.  Shouldn't the limit condition be pushed to BroadcastHashJoin (wherein it would have to stop processing after generating 10 rows?).  Please let me know if my understanding on this is wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, BuildRight, None
   :        :- Project [l_partkey#893]
   :        :  +- Filter isnotnull(l_partkey#893)
   :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC, PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
   :        +- INPUT
   +- BroadcastExchange HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as bigint)),List(ps_partkey#908))
      +- WholeStageCodegen
         :  +- Project [ps_partkey#908]
         :     +- Filter isnotnull(ps_partkey#908)
         :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC, PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>  |
>>>>




--
~Rajesh.B










Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Reynold Xin <rx...@databricks.com>.
Anyway we can verify this easily. I just added a println to each row and
verified that only limit + 1 row was printed after the join and before the
limit.

It'd be great if you do some debugging yourself and see if it is going
through some other code path.


On Mon, Apr 18, 2016 at 10:35 PM, Reynold Xin <rx...@databricks.com> wrote:

> But doExecute is not called?
>
> On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zz...@hortonworks.com>
> wrote:
>
>> Hi Reynold,
>>
>> I just check the code for CollectLimit, there is a shuffle happening to
>> collect them in one partition.
>>
>> protected override def doExecute(): RDD[InternalRow] = {
>>   val shuffled = new ShuffledRowRDD(
>>     ShuffleExchange.prepareShuffleDependency(
>>       child.execute(), child.output, SinglePartition, serializer))
>>   shuffled.mapPartitionsInternal(_.take(limit))
>> }
>>
>> Thus, there is no way to avoid processing all data before the shuffle. I
>> think that is the reason. Do I understand correctly?
>>
>> Thanks.
>>
>> Zhan Zhang
>> On Apr 18, 2016, at 10:08 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>> Unless I'm really missing something I don't think so. As I said, it goes
>> through an iterator and after processing each stream side we do a
>> shouldStop check. The generated code looks like
>>
>> /* 094 */   protected void processNext() throws java.io.IOException {
>> /* 095 */     /*** PRODUCE: Project [id#79L] */
>> /* 096 */
>> /* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
>> BuildRight, None */
>> /* 098 */
>> /* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
>> /* 100 */
>> /* 101 */     // initialize Range
>> /* 102 */     if (!range_initRange) {
>> /* 103 */       range_initRange = true;
>> /* 104 */       initRange(partitionIndex);
>> /* 105 */     }
>> /* 106 */
>> /* 107 */     while (!range_overflow && range_number <
>> range_partitionEnd) {
>> /* 108 */       long range_value = range_number;
>> /* 109 */       range_number += 1L;
>> /* 110 */       if (range_number < range_value ^ 1L < 0) {
>> /* 111 */         range_overflow = true;
>> /* 112 */       }
>> /* 113 */
>> /* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L],
>> Inner, BuildRight, None */
>> /* 115 */
>> /* 116 */       // generate join key for stream side
>> /* 117 */
>> /* 118 */       // find matches from HashedRelation
>> /* 119 */       UnsafeRow bhj_matched = false ? null:
>> (UnsafeRow)bhj_relation.getValue(range_value);
>> /* 120 */       if (bhj_matched == null) continue;
>> /* 121 */
>> /* 122 */       bhj_metricValue.add(1);
>> /* 123 */
>> /* 124 */       /*** CONSUME: Project [id#79L] */
>> /* 125 */
>> /* 126 */       System.out.println("i got one row");
>> /* 127 */
>> /* 128 */       /*** CONSUME: WholeStageCodegen */
>> /* 129 */
>> /* 130 */       project_rowWriter.write(0, range_value);
>> /* 131 */       append(project_result);
>> /* 132 */
>> */* 133 */       if (shouldStop()) return;*
>> /* 134 */     }
>> /* 135 */   }
>> /* 136 */ }
>>
>>
>> shouldStop is false once we go pass the limit.
>>
>>
>>
>> On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zz...@hortonworks.com>
>> wrote:
>>
>>> From the physical plan, the limit is one level up than the
>>> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
>>> it work, the limit has to be part of the wholeStageCodeGen.
>>>
>>> Correct me if I am wrong.
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com> wrote:
>>>
>>> I could be wrong but I think we currently do that through whole stage
>>> codegen. After processing every row on the stream side, the generated code
>>> for broadcast join checks whether it has hit the limit or not (through this
>>> thing called shouldStop).
>>>
>>> It is not the most optimal solution, because a single stream side row
>>> might output multiple hits, but it is usually not a problem.
>>>
>>>
>>> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com>
>>> wrote:
>>>
>>>> While you can't automatically push the limit *through* the join, we
>>>> could push it *into* the join (stop processing after generating 10
>>>> records). I believe that is what Rajesh is suggesting.
>>>>
>>>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>>>> hvanhovell@questtec.nl> wrote:
>>>>
>>>>> I am not sure if you can push a limit through a join. This becomes
>>>>> problematic if not all keys are present on both sides; in such a case a
>>>>> limit can produce fewer rows than the set limit.
>>>>>
>>>>> This might be a rare case in which whole stage codegen is slower, due
>>>>> to the fact that we need to buffer the result of such a stage. You could
>>>>> try to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>>>
>>>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <
>>>>> rajesh.balamohan@gmail.com>:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I ran the following query in spark (latest master codebase) and it
>>>>>> took a lot of time to complete even though it was a broadcast hash join.
>>>>>>
>>>>>> It appears that limit computation is done only after computing
>>>>>> complete join condition.  Shouldn't the limit condition be pushed to
>>>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>>>> generating 10 rows?).  Please let me know if my understanding on this is
>>>>>> wrong.
>>>>>>
>>>>>>
>>>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>>>> limit 10;
>>>>>>
>>>>>> >>>>
>>>>>> | == Physical Plan ==
>>>>>> CollectLimit 10
>>>>>> +- WholeStageCodegen
>>>>>>    :  +- Project [l_partkey#893]
>>>>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908],
>>>>>> Inner, BuildRight, None
>>>>>>    :        :- Project [l_partkey#893]
>>>>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>>>    :        +- INPUT
>>>>>>    +- BroadcastExchange
>>>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>>>> bigint)),List(ps_partkey#908))
>>>>>>       +- WholeStageCodegen
>>>>>>          :  +- Project [ps_partkey#908]
>>>>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>>>  |
>>>>>> >>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> ~Rajesh.B
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Reynold Xin <rx...@databricks.com>.
But doExecute is not called?

On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zz...@hortonworks.com> wrote:

> Hi Reynold,
>
> I just check the code for CollectLimit, there is a shuffle happening to
> collect them in one partition.
>
> protected override def doExecute(): RDD[InternalRow] = {
>   val shuffled = new ShuffledRowRDD(
>     ShuffleExchange.prepareShuffleDependency(
>       child.execute(), child.output, SinglePartition, serializer))
>   shuffled.mapPartitionsInternal(_.take(limit))
> }
>
> Thus, there is no way to avoid processing all data before the shuffle. I
> think that is the reason. Do I understand correctly?
>
> Thanks.
>
> Zhan Zhang
> On Apr 18, 2016, at 10:08 PM, Reynold Xin <rx...@databricks.com> wrote:
>
> Unless I'm really missing something I don't think so. As I said, it goes
> through an iterator and after processing each stream side we do a
> shouldStop check. The generated code looks like
>
> /* 094 */   protected void processNext() throws java.io.IOException {
> /* 095 */     /*** PRODUCE: Project [id#79L] */
> /* 096 */
> /* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 098 */
> /* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
> /* 100 */
> /* 101 */     // initialize Range
> /* 102 */     if (!range_initRange) {
> /* 103 */       range_initRange = true;
> /* 104 */       initRange(partitionIndex);
> /* 105 */     }
> /* 106 */
> /* 107 */     while (!range_overflow && range_number < range_partitionEnd)
> {
> /* 108 */       long range_value = range_number;
> /* 109 */       range_number += 1L;
> /* 110 */       if (range_number < range_value ^ 1L < 0) {
> /* 111 */         range_overflow = true;
> /* 112 */       }
> /* 113 */
> /* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 115 */
> /* 116 */       // generate join key for stream side
> /* 117 */
> /* 118 */       // find matches from HashedRelation
> /* 119 */       UnsafeRow bhj_matched = false ? null:
> (UnsafeRow)bhj_relation.getValue(range_value);
> /* 120 */       if (bhj_matched == null) continue;
> /* 121 */
> /* 122 */       bhj_metricValue.add(1);
> /* 123 */
> /* 124 */       /*** CONSUME: Project [id#79L] */
> /* 125 */
> /* 126 */       System.out.println("i got one row");
> /* 127 */
> /* 128 */       /*** CONSUME: WholeStageCodegen */
> /* 129 */
> /* 130 */       project_rowWriter.write(0, range_value);
> /* 131 */       append(project_result);
> /* 132 */
> */* 133 */       if (shouldStop()) return;*
> /* 134 */     }
> /* 135 */   }
> /* 136 */ }
>
>
> shouldStop is false once we go pass the limit.
>
>
>
> On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zz...@hortonworks.com>
> wrote:
>
>> From the physical plan, the limit is one level up than the
>> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
>> it work, the limit has to be part of the wholeStageCodeGen.
>>
>> Correct me if I am wrong.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com> wrote:
>>
>> I could be wrong but I think we currently do that through whole stage
>> codegen. After processing every row on the stream side, the generated code
>> for broadcast join checks whether it has hit the limit or not (through this
>> thing called shouldStop).
>>
>> It is not the most optimal solution, because a single stream side row
>> might output multiple hits, but it is usually not a problem.
>>
>>
>> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com>
>> wrote:
>>
>>> While you can't automatically push the limit *through* the join, we
>>> could push it *into* the join (stop processing after generating 10
>>> records). I believe that is what Rajesh is suggesting.
>>>
>>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>>> hvanhovell@questtec.nl> wrote:
>>>
>>>> I am not sure if you can push a limit through a join. This becomes
>>>> problematic if not all keys are present on both sides; in such a case a
>>>> limit can produce fewer rows than the set limit.
>>>>
>>>> This might be a rare case in which whole stage codegen is slower, due
>>>> to the fact that we need to buffer the result of such a stage. You could
>>>> try to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>>
>>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamohan@gmail.com
>>>> >:
>>>>
>>>>> Hi,
>>>>>
>>>>> I ran the following query in spark (latest master codebase) and it
>>>>> took a lot of time to complete even though it was a broadcast hash join.
>>>>>
>>>>> It appears that limit computation is done only after computing
>>>>> complete join condition.  Shouldn't the limit condition be pushed to
>>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>>> generating 10 rows?).  Please let me know if my understanding on this is
>>>>> wrong.
>>>>>
>>>>>
>>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>>> limit 10;
>>>>>
>>>>> >>>>
>>>>> | == Physical Plan ==
>>>>> CollectLimit 10
>>>>> +- WholeStageCodegen
>>>>>    :  +- Project [l_partkey#893]
>>>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908],
>>>>> Inner, BuildRight, None
>>>>>    :        :- Project [l_partkey#893]
>>>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>>    :        +- INPUT
>>>>>    +- BroadcastExchange
>>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>>> bigint)),List(ps_partkey#908))
>>>>>       +- WholeStageCodegen
>>>>>          :  +- Project [ps_partkey#908]
>>>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>>  |
>>>>> >>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ~Rajesh.B
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Zhan Zhang <zz...@hortonworks.com>.
Hi Reynold,

I just check the code for CollectLimit, there is a shuffle happening to collect them in one partition.


protected override def doExecute(): RDD[InternalRow] = {
  val shuffled = new ShuffledRowRDD(
    ShuffleExchange.prepareShuffleDependency(
      child.execute(), child.output, SinglePartition, serializer))
  shuffled.mapPartitionsInternal(_.take(limit))
}

Thus, there is no way to avoid processing all data before the shuffle. I think that is the reason. Do I understand correctly?

Thanks.

Zhan Zhang
On Apr 18, 2016, at 10:08 PM, Reynold Xin <rx...@databricks.com>> wrote:

Unless I'm really missing something I don't think so. As I said, it goes through an iterator and after processing each stream side we do a shouldStop check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */     /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */
/* 098 */
/* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */     // initialize Range
/* 102 */     if (!range_initRange) {
/* 103 */       range_initRange = true;
/* 104 */       initRange(partitionIndex);
/* 105 */     }
/* 106 */
/* 107 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */       long range_value = range_number;
/* 109 */       range_number += 1L;
/* 110 */       if (range_number < range_value ^ 1L < 0) {
/* 111 */         range_overflow = true;
/* 112 */       }
/* 113 */
/* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, BuildRight, None */
/* 115 */
/* 116 */       // generate join key for stream side
/* 117 */
/* 118 */       // find matches from HashedRelation
/* 119 */       UnsafeRow bhj_matched = false ? null: (UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */       if (bhj_matched == null) continue;
/* 121 */
/* 122 */       bhj_metricValue.add(1);
/* 123 */
/* 124 */       /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */       System.out.println("i got one row");
/* 127 */
/* 128 */       /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */       project_rowWriter.write(0, range_value);
/* 131 */       append(project_result);
/* 132 */
/* 133 */       if (shouldStop()) return;
/* 134 */     }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zz...@hortonworks.com>> wrote:
>From the physical plan, the limit is one level up than the WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move it work, the limit has to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop).

It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hv...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>>:
Hi,

I ran the following query in spark (latest master codebase) and it took a lot of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete join condition.  Shouldn't the limit condition be pushed to BroadcastHashJoin (wherein it would have to stop processing after generating 10 rows?).  Please let me know if my understanding on this is wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, BuildRight, None
   :        :- Project [l_partkey#893]
   :        :  +- Filter isnotnull(l_partkey#893)
   :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC, PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
   :        +- INPUT
   +- BroadcastExchange HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as bigint)),List(ps_partkey#908))
      +- WholeStageCodegen
         :  +- Project [ps_partkey#908]
         :     +- Filter isnotnull(ps_partkey#908)
         :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC, PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>  |
>>>>




--
~Rajesh.B







Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Reynold Xin <rx...@databricks.com>.
Unless I'm really missing something I don't think so. As I said, it goes
through an iterator and after processing each stream side we do a
shouldStop check. The generated code looks like

/* 094 */   protected void processNext() throws java.io.IOException {
/* 095 */     /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 098 */
/* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */     // initialize Range
/* 102 */     if (!range_initRange) {
/* 103 */       range_initRange = true;
/* 104 */       initRange(partitionIndex);
/* 105 */     }
/* 106 */
/* 107 */     while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */       long range_value = range_number;
/* 109 */       range_number += 1L;
/* 110 */       if (range_number < range_value ^ 1L < 0) {
/* 111 */         range_overflow = true;
/* 112 */       }
/* 113 */
/* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 115 */
/* 116 */       // generate join key for stream side
/* 117 */
/* 118 */       // find matches from HashedRelation
/* 119 */       UnsafeRow bhj_matched = false ? null:
(UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */       if (bhj_matched == null) continue;
/* 121 */
/* 122 */       bhj_metricValue.add(1);
/* 123 */
/* 124 */       /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */       System.out.println("i got one row");
/* 127 */
/* 128 */       /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */       project_rowWriter.write(0, range_value);
/* 131 */       append(project_result);
/* 132 */
*/* 133 */       if (shouldStop()) return;*
/* 134 */     }
/* 135 */   }
/* 136 */ }


shouldStop is false once we go pass the limit.



On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zz...@hortonworks.com> wrote:

> From the physical plan, the limit is one level up than the
> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
> it work, the limit has to be part of the wholeStageCodeGen.
>
> Correct me if I am wrong.
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com> wrote:
>
> I could be wrong but I think we currently do that through whole stage
> codegen. After processing every row on the stream side, the generated code
> for broadcast join checks whether it has hit the limit or not (through this
> thing called shouldStop).
>
> It is not the most optimal solution, because a single stream side row
> might output multiple hits, but it is usually not a problem.
>
>
> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com> wrote:
>
>> While you can't automatically push the limit *through* the join, we could
>> push it *into* the join (stop processing after generating 10 records). I
>> believe that is what Rajesh is suggesting.
>>
>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>> hvanhovell@questtec.nl> wrote:
>>
>>> I am not sure if you can push a limit through a join. This becomes
>>> problematic if not all keys are present on both sides; in such a case a
>>> limit can produce fewer rows than the set limit.
>>>
>>> This might be a rare case in which whole stage codegen is slower, due to
>>> the fact that we need to buffer the result of such a stage. You could try
>>> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>
>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>
>>> :
>>>
>>>> Hi,
>>>>
>>>> I ran the following query in spark (latest master codebase) and it took
>>>> a lot of time to complete even though it was a broadcast hash join.
>>>>
>>>> It appears that limit computation is done only after computing complete
>>>> join condition.  Shouldn't the limit condition be pushed to
>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>> generating 10 rows?).  Please let me know if my understanding on this is
>>>> wrong.
>>>>
>>>>
>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>> limit 10;
>>>>
>>>> >>>>
>>>> | == Physical Plan ==
>>>> CollectLimit 10
>>>> +- WholeStageCodegen
>>>>    :  +- Project [l_partkey#893]
>>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>>>> BuildRight, None
>>>>    :        :- Project [l_partkey#893]
>>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>    :        +- INPUT
>>>>    +- BroadcastExchange
>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>> bigint)),List(ps_partkey#908))
>>>>       +- WholeStageCodegen
>>>>          :  +- Project [ps_partkey#908]
>>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>  |
>>>> >>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ~Rajesh.B
>>>>
>>>
>>>
>>
>
>

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Zhan Zhang <zz...@hortonworks.com>.
>From the physical plan, the limit is one level up than the WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move it work, the limit has to be part of the wholeStageCodeGen.

Correct me if I am wrong.

Thanks.

Zhan Zhang

On Apr 18, 2016, at 11:09 AM, Reynold Xin <rx...@databricks.com>> wrote:

I could be wrong but I think we currently do that through whole stage codegen. After processing every row on the stream side, the generated code for broadcast join checks whether it has hit the limit or not (through this thing called shouldStop).

It is not the most optimal solution, because a single stream side row might output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com>> wrote:
While you can't automatically push the limit *through* the join, we could push it *into* the join (stop processing after generating 10 records). I believe that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <hv...@questtec.nl>> wrote:
I am not sure if you can push a limit through a join. This becomes problematic if not all keys are present on both sides; in such a case a limit can produce fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to the fact that we need to buffer the result of such a stage. You could try to disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>>:
Hi,

I ran the following query in spark (latest master codebase) and it took a lot of time to complete even though it was a broadcast hash join.

It appears that limit computation is done only after computing complete join condition.  Shouldn't the limit condition be pushed to BroadcastHashJoin (wherein it would have to stop processing after generating 10 rows?).  Please let me know if my understanding on this is wrong.


select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit 10;

>>>>
| == Physical Plan ==
CollectLimit 10
+- WholeStageCodegen
   :  +- Project [l_partkey#893]
   :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner, BuildRight, None
   :        :- Project [l_partkey#893]
   :        :  +- Filter isnotnull(l_partkey#893)
   :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC, PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
   :        +- INPUT
   +- BroadcastExchange HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as bigint)),List(ps_partkey#908))
      +- WholeStageCodegen
         :  +- Project [ps_partkey#908]
         :     +- Filter isnotnull(ps_partkey#908)
         :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC, PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>  |
>>>>




--
~Rajesh.B





Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Reynold Xin <rx...@databricks.com>.
I could be wrong but I think we currently do that through whole stage
codegen. After processing every row on the stream side, the generated code
for broadcast join checks whether it has hit the limit or not (through this
thing called shouldStop).

It is not the most optimal solution, because a single stream side row might
output multiple hits, but it is usually not a problem.


On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ra...@gmail.com> wrote:

> While you can't automatically push the limit *through* the join, we could
> push it *into* the join (stop processing after generating 10 records). I
> believe that is what Rajesh is suggesting.
>
> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
> hvanhovell@questtec.nl> wrote:
>
>> I am not sure if you can push a limit through a join. This becomes
>> problematic if not all keys are present on both sides; in such a case a
>> limit can produce fewer rows than the set limit.
>>
>> This might be a rare case in which whole stage codegen is slower, due to
>> the fact that we need to buffer the result of such a stage. You could try
>> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>
>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>:
>>
>>> Hi,
>>>
>>> I ran the following query in spark (latest master codebase) and it took
>>> a lot of time to complete even though it was a broadcast hash join.
>>>
>>> It appears that limit computation is done only after computing complete
>>> join condition.  Shouldn't the limit condition be pushed to
>>> BroadcastHashJoin (wherein it would have to stop processing after
>>> generating 10 rows?).  Please let me know if my understanding on this is
>>> wrong.
>>>
>>>
>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>> limit 10;
>>>
>>> >>>>
>>> | == Physical Plan ==
>>> CollectLimit 10
>>> +- WholeStageCodegen
>>>    :  +- Project [l_partkey#893]
>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>>> BuildRight, None
>>>    :        :- Project [l_partkey#893]
>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>    :        +- INPUT
>>>    +- BroadcastExchange
>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>> bigint)),List(ps_partkey#908))
>>>       +- WholeStageCodegen
>>>          :  +- Project [ps_partkey#908]
>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>  |
>>> >>>>
>>>
>>>
>>>
>>>
>>> --
>>> ~Rajesh.B
>>>
>>
>>
>

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Andrew Ray <ra...@gmail.com>.
While you can't automatically push the limit *through* the join, we could
push it *into* the join (stop processing after generating 10 records). I
believe that is what Rajesh is suggesting.

On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
hvanhovell@questtec.nl> wrote:

> I am not sure if you can push a limit through a join. This becomes
> problematic if not all keys are present on both sides; in such a case a
> limit can produce fewer rows than the set limit.
>
> This might be a rare case in which whole stage codegen is slower, due to
> the fact that we need to buffer the result of such a stage. You could try
> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>
> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>:
>
>> Hi,
>>
>> I ran the following query in spark (latest master codebase) and it took a
>> lot of time to complete even though it was a broadcast hash join.
>>
>> It appears that limit computation is done only after computing complete
>> join condition.  Shouldn't the limit condition be pushed to
>> BroadcastHashJoin (wherein it would have to stop processing after
>> generating 10 rows?).  Please let me know if my understanding on this is
>> wrong.
>>
>>
>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
>> 10;
>>
>> >>>>
>> | == Physical Plan ==
>> CollectLimit 10
>> +- WholeStageCodegen
>>    :  +- Project [l_partkey#893]
>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>> BuildRight, None
>>    :        :- Project [l_partkey#893]
>>    :        :  +- Filter isnotnull(l_partkey#893)
>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>    :        +- INPUT
>>    +- BroadcastExchange
>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>> bigint)),List(ps_partkey#908))
>>       +- WholeStageCodegen
>>          :  +- Project [ps_partkey#908]
>>          :     +- Filter isnotnull(ps_partkey#908)
>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>  |
>> >>>>
>>
>>
>>
>>
>> --
>> ~Rajesh.B
>>
>
>

Re: SparkSQL - Limit pushdown on BroadcastHashJoin

Posted by Herman van Hövell tot Westerflier <hv...@questtec.nl>.
I am not sure if you can push a limit through a join. This becomes
problematic if not all keys are present on both sides; in such a case a
limit can produce fewer rows than the set limit.

This might be a rare case in which whole stage codegen is slower, due to
the fact that we need to buffer the result of such a stage. You could try
to disable it by setting "spark.sql.codegen.wholeStage" to false.

2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <ra...@gmail.com>:

> Hi,
>
> I ran the following query in spark (latest master codebase) and it took a
> lot of time to complete even though it was a broadcast hash join.
>
> It appears that limit computation is done only after computing complete
> join condition.  Shouldn't the limit condition be pushed to
> BroadcastHashJoin (wherein it would have to stop processing after
> generating 10 rows?).  Please let me know if my understanding on this is
> wrong.
>
>
> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey limit
> 10;
>
> >>>>
> | == Physical Plan ==
> CollectLimit 10
> +- WholeStageCodegen
>    :  +- Project [l_partkey#893]
>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
> BuildRight, None
>    :        :- Project [l_partkey#893]
>    :        :  +- Filter isnotnull(l_partkey#893)
>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>    :        +- INPUT
>    +- BroadcastExchange
> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
> bigint)),List(ps_partkey#908))
>       +- WholeStageCodegen
>          :  +- Project [ps_partkey#908]
>          :     +- Filter isnotnull(ps_partkey#908)
>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>  |
> >>>>
>
>
>
>
> --
> ~Rajesh.B
>