You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by Venki Korukanti <ve...@gmail.com> on 2015/02/02 20:21:17 UTC

Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 2, 2015, 7:21 p.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs
-----

  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 33
> > <https://reviews.apache.org/r/30466/diff/1/?file=842669#file842669line33>
> >
> >     It looks like endpoint should be final.

updated.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 45
> > <https://reviews.apache.org/r/30466/diff/1/?file=842669#file842669line45>
> >
> >     If I created this for a particular endpoint, why would I want to be able to change it?
> >     
> >     If I do change it, why doesn't that invalidate the affinity value?

This method is not needed. Removed it.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 54
> > <https://reviews.apache.org/r/30466/diff/1/?file=842669#file842669line54>
> >
> >     What does it mean for the two endpoint values not to be equal? How do you expect this to be used?

We use this for sorting the EndpointAffinity list based on the affinity. Added a comment.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 66
> > <https://reviews.apache.org/r/30466/diff/1/?file=842669#file842669line66>
> >
> >     Can this condition ever occur? It it actually possible to reach POSITIVE_INFINITY by adding to the affinity as in addAffinity() (and the condition in there also looks suspicious)?

We use POSITIVE_INIFINITY to indicate that endpoint in this object should have an assignment. One example is Screen. Fragment containing Screen should always be assigned to the node where the Foreman for the query is present.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java, line 29
> > <https://reviews.apache.org/r/30466/diff/1/?file=842670#file842670line29>
> >
> >     Why are these not final (and set in an AbstractExchange constructor)?
> >     
> >     Is the MuxExchange below, the sender major fragment id is referenced, but has never been set. It looks like these aren't both used. Perhaps they shouldn't be declared in this class, but each class that does need them should declare whichever one it needs itself.

These are set when assigning the sender endpoint list in AbstractExchange.setupSenders() (similar method for receiver)


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java, line 52
> > <https://reviews.apache.org/r/30466/diff/1/?file=842671#file842671line52>
> >
> >     This will make a new (empty) list each time it is called. Shouldn't the class be hanging on to its currently known operator affinity list, even an empty one?

We are returning the empty list which is a static final in Collections. Most of the implementations of GroupScan cache the result and don't recalculate everytime getOperatorAffinity() method is called (I see some exceptions, I am going to address later as this problem is not introduced by this change).


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java, line 58
> > <https://reviews.apache.org/r/30466/diff/1/?file=842680#file842680line58>
> >
> >     From these declarations, we can't tell what kind of List senders is; it might be an ArrayList, or it might be a LinkedList. In the latter case, get(i) is expensive, because it will iterate down the list to get to each item. Because of that, we should iterate over that list instead, something like this:
> >     
> >     int i = 0;
> >     for(DrillbitEndpoint de : senders) {
> >       this.senders.put(i, de);
> >       ++i;
> >     }

The reason why are using get(i) is because we want to create a mapping of "i" -> DrillbitEndpoint. DrillbitEndpoint assigned to minor fragment 3 is at index 3 in the list.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java, line 131
> > <https://reviews.apache.org/r/30466/diff/1/?file=842681#file842681line131>
> >
> >     "this." isn't necessary here.
> >     
> >     I don't see any code that actually sets this variable.

Removed this. It is set in AbstractExchange.setupSenders/Receivers()


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java, line 38
> > <https://reviews.apache.org/r/30466/diff/1/?file=842687#file842687line38>
> >
> >     Why don't we make this map final?

changed.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java, line 46
> > <https://reviews.apache.org/r/30466/diff/1/?file=842687#file842687line46>
> >
> >     This code is the same as that in MergingReceiverPOP (which has one additional different line at its end); should it be pulled up into AbstractReceiver?

Moved it to Abstract Receiver.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70618
-----------------------------------------------------------


On Feb. 2, 2015, 7:21 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 2, 2015, 7:21 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java, line 58
> > <https://reviews.apache.org/r/30466/diff/1/?file=842680#file842680line58>
> >
> >     From these declarations, we can't tell what kind of List senders is; it might be an ArrayList, or it might be a LinkedList. In the latter case, get(i) is expensive, because it will iterate down the list to get to each item. Because of that, we should iterate over that list instead, something like this:
> >     
> >     int i = 0;
> >     for(DrillbitEndpoint de : senders) {
> >       this.senders.put(i, de);
> >       ++i;
> >     }
> 
> Venki Korukanti wrote:
>     The reason why are using get(i) is because we want to create a mapping of "i" -> DrillbitEndpoint. DrillbitEndpoint assigned to minor fragment 3 is at index 3 in the list.

Changed to use the iterator method you mentioned above as iterator is returning in order.


> On Feb. 2, 2015, 10:18 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java, line 62
> > <https://reviews.apache.org/r/30466/diff/1/?file=842687#file842687line62>
> >
> >     How does this get used? Would it be dangerous if the caller modified the map? Should we return a copy?

Changed to return an immutable copy.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70618
-----------------------------------------------------------


On Feb. 4, 2015, 2:24 a.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 2:24 a.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Chris Westin <cw...@yahoo.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70618
-----------------------------------------------------------



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment115906>

    It looks like endpoint should be final.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment115909>

    If I created this for a particular endpoint, why would I want to be able to change it?
    
    If I do change it, why doesn't that invalidate the affinity value?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment115910>

    What does it mean for the two endpoint values not to be equal? How do you expect this to be used?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment115856>

    Can this condition ever occur? It it actually possible to reach POSITIVE_INFINITY by adding to the affinity as in addAffinity() (and the condition in there also looks suspicious)?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
<https://reviews.apache.org/r/30466/#comment115915>

    Why are these not final (and set in an AbstractExchange constructor)?
    
    Is the MuxExchange below, the sender major fragment id is referenced, but has never been set. It looks like these aren't both used. Perhaps they shouldn't be declared in this class, but each class that does need them should declare whichever one it needs itself.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
<https://reviews.apache.org/r/30466/#comment115920>

    This will make a new (empty) list each time it is called. Shouldn't the class be hanging on to its currently known operator affinity list, even an empty one?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
<https://reviews.apache.org/r/30466/#comment115896>

    It would be helpful to expand on this comment with some of the reasons these might have affinity with each other. In my mind, they would always want to be together (if possible) to avoid any network transit, so I'm having a hard time with finding a reason they wouldn't want to have affinity.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
<https://reviews.apache.org/r/30466/#comment115903>

    From these declarations, we can't tell what kind of List senders is; it might be an ArrayList, or it might be a LinkedList. In the latter case, get(i) is expensive, because it will iterate down the list to get to each item. Because of that, we should iterate over that list instead, something like this:
    
    int i = 0;
    for(DrillbitEndpoint de : senders) {
      this.senders.put(i, de);
      ++i;
    }



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment115914>

    "this." isn't necessary here.
    
    I don't see any code that actually sets this variable.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
<https://reviews.apache.org/r/30466/#comment115917>

    Why don't we make this map final?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
<https://reviews.apache.org/r/30466/#comment115918>

    This code is the same as that in MergingReceiverPOP (which has one additional different line at its end); should it be pulled up into AbstractReceiver?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
<https://reviews.apache.org/r/30466/#comment115919>

    How does this get used? Would it be dangerous if the caller modified the map? Should we return a copy?


- Chris Westin


On Feb. 2, 2015, 7:21 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 2, 2015, 7:21 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > PojoRecordReader needs infinite affinity.  Please fix and add test case with slice target as 1 and group by/sort to force exchange insertion and make sure that infinite affinity works correctly  (maybe add a few more cases on just that sub functionality)
> > 
> > Why don’t we always have small affinity to previous node for other exchanges?

TestLocalExchange.java already contains tests for groupby/join on tables that have multiple input files and slice_target set to 1. 

In the new patch removed NO_AFFINITY, added exchange default affinity as RECEIVER_AFFINITY_TO_SENDER and receiver affinities to sender location.


> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 68
> > <https://reviews.apache.org/r/30466/diff/3/?file=848072#file848072line68>
> >
> >     This seems misnamed.  Maybe isAffinityRequired()

Renamed to isAssignmentRequired().


> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java, line 38
> > <https://reviews.apache.org/r/30466/diff/3/?file=848081#file848081line38>
> >
> >     If we are using this all over, let's use IntObjectOpenHashMap<DrillbitEndpoint>

Move to List<MinorFragmentEndpoint> where MinorFragmentEndpoint object contains fragment id and Drillbit endpoint.


> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java, line 235
> > <https://reviews.apache.org/r/30466/diff/3/?file=848112#file848112line235>
> >
> >     This naming is a bit misleading.  It sounds like whether or not this will be parallel but I believe it is actually have we already made parallelization decisions about this.  If the latter, we should give a more descriptive name.

Removed to isEndpointAssignmentDone()


> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java, line 297
> > <https://reviews.apache.org/r/30466/diff/3/?file=848130#file848130line297>
> >
> >     can we move to common?

Removed this as Guava Files provides APIs for creating temp dir/files in process temp directory. Using those APIs.


> On Feb. 5, 2015, 11:04 p.m., Jacques Nadeau wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java, line 60
> > <https://reviews.apache.org/r/30466/diff/3/?file=848078#file848078line60>
> >
> >     It seems ParallelizationInfo and these other methods are redundant.  Why do we have both?
> >     
> >     Also wondering why you deleted HasAffinity.  The idea was that any node could have affinity and that we ultimately choose to understand affinity based on that.

Reverted the HasAffinity interface and now GroupScan and Store continue to use that interface.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70841
-----------------------------------------------------------


On Feb. 4, 2015, 4:30 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 4:30 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Jacques Nadeau <ja...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70841
-----------------------------------------------------------


PojoRecordReader needs infinite affinity.  Please fix and add test case with slice target as 1 and group by/sort to force exchange insertion and make sure that infinite affinity works correctly  (maybe add a few more cases on just that sub functionality)

Why don’t we always have small affinity to previous node for other exchanges?


exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment116282>

    I'm not sure what this method name means.  Maybe something more like isAffinityRequired()



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment116888>

    This seems misnamed.  Maybe isAffinityRequired()



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
<https://reviews.apache.org/r/30466/#comment116889>

    Probably this should be a constant rather than constantly recreating the default.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
<https://reviews.apache.org/r/30466/#comment116894>

    It seems ParallelizationInfo and these other methods are redundant.  Why do we have both?
    
    Also wondering why you deleted HasAffinity.  The idea was that any node could have affinity and that we ultimately choose to understand affinity based on that.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
<https://reviews.apache.org/r/30466/#comment116895>

    If we are using this all over, let's use IntObjectOpenHashMap<DrillbitEndpoint>



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java
<https://reviews.apache.org/r/30466/#comment116898>

    IllegalState?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java
<https://reviews.apache.org/r/30466/#comment116897>

    Why not use a ArrayListMultiMap?



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java
<https://reviews.apache.org/r/30466/#comment116912>

    Naming this senderDrillbitLocation is confusing.  This is the location of the receiver.  Yes, you're building a map that is sendendpoint -> [recv, recv] but I still found this confusing.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
<https://reviews.apache.org/r/30466/#comment116984>

    Why did you change to Map?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
<https://reviews.apache.org/r/30466/#comment116992>

    This naming is a bit misleading.  It sounds like whether or not this will be parallel but I believe it is actually have we already made parallelization decisions about this.  If the latter, we should give a more descriptive name.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
<https://reviews.apache.org/r/30466/#comment116994>

    Let's make mux and demux separate configuration options.



exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
<https://reviews.apache.org/r/30466/#comment116995>

    Why are you changing to maps everywhere?  Is this because you now have Sparse lists?  I'm not sure it is worth the change.  If you must, let's create an array wrapper class that doesn't do hashing.



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment117010>

    Should this be in basetest



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment117011>

    can we move this functionality to base test query?



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment117012>

    can we move this to common class?



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment117013>

    can we move to common?


- Jacques Nadeau


On Feb. 4, 2015, 4:30 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 4:30 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 26, 2015, 6:15 p.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Changes
-------

Addressed review comments by Jacques and Yuliya.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
  contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractDeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/AbstractMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java 804671e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 0ece367 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java 85a7b86 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedDeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnorderedMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 35e7f5c 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
  exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java cf99577 
  exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
  exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
  exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
  exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 
  pom.xml 525579a 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Chris Westin <ch...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review73230
-----------------------------------------------------------


No new comments on this revision.

- Chris Westin


On Feb. 17, 2015, 6:32 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 17, 2015, 6:32 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
>   contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 0ece367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java 85a7b86 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 0ac7c97 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
>   exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
>   exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
>   exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
>   exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
>   exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 18, 2015, 2:32 a.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
  contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 0ece367 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java 85a7b86 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 0ac7c97 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
  exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
  exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
  exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
  exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
  exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 17, 2015, 10:33 p.m., Chris Westin wrote:
> > exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java, line 64
> > <https://reviews.apache.org/r/30466/diff/6/?file=860987#file860987line64>
> >
> >     How does this test resizing? You start with initial size 10, but only insert 4 values.

Confusion is over the value passed to constructor. Value indicates the max key value and not the number of entries it has capacity for. I think it is better to start the array with a fixed size and resize without the client specifying any capacity. Changed the code accordingly.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review72784
-----------------------------------------------------------


On Feb. 11, 2015, 7:26 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 11, 2015, 7:26 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
>   contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 6b3d301 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
>   exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
>   exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
>   exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
>   exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
>   exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Chris Westin <ch...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review72784
-----------------------------------------------------------



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java
<https://reviews.apache.org/r/30466/#comment118852>

    Since receiverLocations isn't changed by the loop body, receiverLocations.size() should be a constant.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java
<https://reviews.apache.org/r/30466/#comment118860>

    You call senderLocations.get(senderFragmentId) twice; call it once and reuse the value.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment118853>

    Preconditions is really for arguments. Here, you should do the test and throw an IllegalStateException().



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment118854>

    As above re Preconditions.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment118855>

    senderLocations.size() should be a final constant before the loop.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment118856>

    receiverLocations.size() should be a final constant before the loop.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java
<https://reviews.apache.org/r/30466/#comment118858>

    You call receiverLocations.get(receiverFragmentId) twice. Get it once and reuse.



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment118880>

    newBuilder() goes on the previous line, because it isn't setting something within the builder, unlike the other lines.



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment118881>

    In tests, you should stick to one condition per assert, so this should be assertNotNull(assignments); assertTrue(assignments.size() > 0);



exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
<https://reviews.apache.org/r/30466/#comment118883>

    The order for assertEquals() is (expected, actual); please swap these to avoid confusing error messages.



exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
<https://reviews.apache.org/r/30466/#comment118884>

    expected, actual: http://junit.sourceforge.net/javadoc/org/junit/Assert.html .



exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java
<https://reviews.apache.org/r/30466/#comment118886>

    How does this test resizing? You start with initial size 10, but only insert 4 values.


- Chris Westin


On Feb. 11, 2015, 11:26 a.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 11, 2015, 11:26 a.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
>   contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 6b3d301 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
>   exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
>   exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
>   exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
>   exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
>   exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 11, 2015, 7:26 p.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Changes
-------

Added ArrayWrapped Map to speedup get calls. 
Added more test coverage.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
  contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 6b3d301 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java aa0a5ad 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/util/ArrayWrappedIntIntMap.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
  exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
  exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
  exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
  exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 
  exec/java-exec/src/test/java/org/apache/drill/exec/util/TestArrayWrappedIntIntMap.java PRE-CREATION 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 10, 2015, 2:40 a.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Changes
-------

Addressed remaining review comments.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
  contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 6b3d301 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
  exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
  exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
  exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
  exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 9, 2015, 3:05 p.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java 1152b7b 
  contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/HiveTestBase.java 7e3b6c8 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/MinorFragmentEndpoint.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java d78ba8e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java 52b892e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 6b3d301 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java 9902443 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 378e81a 
  exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java 17bcb79 
  exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java c52545d 
  exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassTransformation.java bb855c9 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
  exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java 219e66f 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 4, 2015, 11:37 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java, line 257
> > <https://reviews.apache.org/r/30466/diff/1/?file=842694#file842694line257>
> >
> >     Ok. If we use a custom Comparator here, as I suggested above in the comments about compareTo() (which looks very weird with only some members, which is why I suggested removing it), then we can reverse the ordering in the Comparator, and skip the reverse().

Updated to use custom Comparator here and remove Comparable implementation from EndpointAffinity.


> On Feb. 4, 2015, 11:37 p.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java, line 45
> > <https://reviews.apache.org/r/30466/diff/1/?file=842669#file842669line45>
> >
> >     Great. In that case, endpoint should be final.

Already converted to final in v2 patch.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review71077
-----------------------------------------------------------


On Feb. 4, 2015, 4:30 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 4:30 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Chris Westin <ch...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review71077
-----------------------------------------------------------



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment116640>

    Great. In that case, endpoint should be final.



exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
<https://reviews.apache.org/r/30466/#comment116645>

    Ok, in that case, I would remove this, and instead provide a custom Comparator at the site where sort() is called.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment116651>

    Ok. If we use a custom Comparator here, as I suggested above in the comments about compareTo() (which looks very weird with only some members, which is why I suggested removing it), then we can reverse the ordering in the Comparator, and skip the reverse().


- Chris Westin


On Feb. 4, 2015, 8:30 a.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 8:30 a.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 4, 2015, 4:30 p.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Changes
-------

Includes cleanup in AbstractReceiver and its sub classes. Also addressed few review comments.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperatorUtil.java dfcb113 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/
-----------------------------------------------------------

(Updated Feb. 4, 2015, 2:24 a.m.)


Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.


Changes
-------

Added demux and new tests. Also addressed some review comments.


Repository: drill-git


Description
-------

In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.

Brief overview of changes:
1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
2. Parallelization changes: 
   i) Traverse the physical operator tree and divide it into Fragments.
   ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
   iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0


Diffs (updated)
-----

  exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
  exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
  exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
  exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
  exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
  exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
  exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
  exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 

Diff: https://reviews.apache.org/r/30466/diff/


Testing
-------

Ran Functional and TPCH SF100 parquet verification tests.


Thanks,

Venki Korukanti


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java, line 240
> > <https://reviews.apache.org/r/30466/diff/1/?file=842697#file842697line240>
> >
> >     Would it be dangerous for the caller to modify this?

Changed to return an immutable copy.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70639
-----------------------------------------------------------


On Feb. 4, 2015, 2:24 a.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 4, 2015, 2:24 a.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java f621a26 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSender.java 53a0721 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Sender.java bbd1b2c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastSender.java 1827367 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/DeMuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashPartitionSender.java bdb1362 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionSender.java 0a2b9be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java c8c8f43 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java 22fa047 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java f09acaa 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java 4292c09 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DeMuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectGroupScan.java aa1609d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java 8335ed9 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockGroupScanPOP.java 5736df8 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java 053f5de 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java 9a32ff9 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Venki Korukanti <ve...@gmail.com>.

> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java, line 38
> > <https://reviews.apache.org/r/30466/diff/1/?file=842691#file842691line38>
> >
> >     This class has no member variables? Perhaps the constructor should be private so that no one else creates more instances, and they are always forced to use the singleton?

Added private constructor.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java, line 32
> > <https://reviews.apache.org/r/30466/diff/1/?file=842692#file842692line32>
> >
> >     It looks like the affinityMap can be final.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java, line 30
> > <https://reviews.apache.org/r/30466/diff/1/?file=842693#file842693line30>
> >
> >     It looks like the fragmentMap should be final.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java, line 77
> > <https://reviews.apache.org/r/30466/diff/1/?file=842694#file842694line77>
> >
> >     Put the result of context.getOptions() into a local and reuse that throughout this function.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java, line 204
> > <https://reviews.apache.org/r/30466/diff/1/?file=842694#file842694line204>
> >
> >     Put stats.getParallelizationInfo() into a local, and then reuse that throughout the rest of this function.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java, line 257
> > <https://reviews.apache.org/r/30466/diff/1/?file=842694#file842694line257>
> >
> >     Why does this need to be sorted? What is the sort key?

We want to order the EndpointAffinity based in descending order of affinity values. DrillbitEndpoints with high affinity get picked up first when selecting the endpoints.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java, line 261
> > <https://reviews.apache.org/r/30466/diff/1/?file=842694#file842694line261>
> >
> >     This is a little unusual, because most code would normally not expect the size of a collection to change like this, and many would use
> >     final int count = endpoints.size();
> >     while(count < ....) {
> >       ...
> >       }
> >     A comment here would be really helpful, something like "keep adding endpoints until we have the same number as the slot count."

Added a comment.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java, line 59
> > <https://reviews.apache.org/r/30466/diff/1/?file=842697#file842697line59>
> >
> >     Can this be final?

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java, line 63
> > <https://reviews.apache.org/r/30466/diff/1/?file=842697#file842697line63>
> >
> >     Can this be final?

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java, line 223
> > <https://reviews.apache.org/r/30466/diff/1/?file=842697#file842697line223>
> >
> >     this. isn't necessary here.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java, line 57
> > <https://reviews.apache.org/r/30466/diff/1/?file=842704#file842704line57>
> >
> >     Doesn't need this.

Changed.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java, line 52
> > <https://reviews.apache.org/r/30466/diff/1/?file=842710#file842710line52>
> >
> >     What is this commented out query? Should it just be deleted?

Removed. Also in the v2 patch, this class has more tests.


> On Feb. 3, 2015, 12:03 a.m., Chris Westin wrote:
> > exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java, line 59
> > <https://reviews.apache.org/r/30466/diff/1/?file=842710#file842710line59>
> >
> >     What is this commented out line?

Removed.


- Venki


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70639
-----------------------------------------------------------


On Feb. 2, 2015, 7:21 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 2, 2015, 7:21 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>


Re: Review Request 30466: DRILL-133: LocalExchange (planning and parallelization)

Posted by Chris Westin <cw...@yahoo.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30466/#review70639
-----------------------------------------------------------



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
<https://reviews.apache.org/r/30466/#comment115923>

    But addOperator() above will silently drop this on the floor if root is already set. Should that give an error similar to the above?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
<https://reviews.apache.org/r/30466/#comment115924>

    This class has no member variables? Perhaps the constructor should be private so that no one else creates more instances, and they are always forced to use the singleton?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java
<https://reviews.apache.org/r/30466/#comment115925>

    It looks like the affinityMap can be final.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
<https://reviews.apache.org/r/30466/#comment115926>

    It looks like the fragmentMap should be final.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115927>

    Put the result of context.getOptions() into a local and reuse that throughout this function.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115928>

    Given how you're using it, roots should be a HashSet.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115930>

    Put stats.getParallelizationInfo() into a local, and then reuse that throughout the rest of this function.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115931>

    Move the declaration of this list down to just before the while() loop where it is used, below.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115932>

    Why do you create the affinedEPs copy of the endpointAffinityMap values, when you could just use endpointAffinityMap.values() directly here?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115933>

    Why does this need to be sorted? What is the sort key?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115934>

    This is a little unusual, because most code would normally not expect the size of a collection to change like this, and many would use
    final int count = endpoints.size();
    while(count < ....) {
      ...
      }
    A comment here would be really helpful, something like "keep adding endpoints until we have the same number as the slot count."



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115935>

    Removing random items from a list is going to cause a list traversal for each item in the list. In this case, it would be better to create "all" as an empty list, and then iterate over activeEndpoints, and only add it to all if it is not in the endpointAffinityMap() (which is a hash lookup to test).



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115936>

    Don't create this empty list if we don't need it.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115937>

    Don't create this empty list if we don't need it.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
<https://reviews.apache.org/r/30466/#comment115939>

    I don't understand what this loop is doing. It seems to leave sendingEndpoints set to the last parallelized sendingFragment's endpoints. What is significant about the last one it finds?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
<https://reviews.apache.org/r/30466/#comment115951>

    Can this be final?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
<https://reviews.apache.org/r/30466/#comment115952>

    Can this be final?



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
<https://reviews.apache.org/r/30466/#comment115949>

    this. isn't necessary here.



exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
<https://reviews.apache.org/r/30466/#comment115950>

    Would it be dangerous for the caller to modify this?



exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
<https://reviews.apache.org/r/30466/#comment115953>

    Doesn't need this.



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment115959>

    What is this commented out query? Should it just be deleted?



exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
<https://reviews.apache.org/r/30466/#comment115960>

    What is this commented out line?


- Chris Westin


On Feb. 2, 2015, 7:21 p.m., Venki Korukanti wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30466/
> -----------------------------------------------------------
> 
> (Updated Feb. 2, 2015, 7:21 p.m.)
> 
> 
> Review request for drill, Chris Westin, Jacques Nadeau, and Steven Phillips.
> 
> 
> Repository: drill-git
> 
> 
> Description
> -------
> 
> In this patch LocalExchange contains only multiplexing exchange. Currently working on demultiplexing exchange (there are few failures in executions and currently debugging those). Sending this patch to get initial feedback.
> 
> Brief overview of changes:
> 1. Traverse the PRel tree after all optimizations. Whenever a HashToRandomExchangePrel is encountered insert a MuxExchange before HashToRandomExchangePrel and DemuxExchange after HashToRandomExchangePrel.
> 2. Parallelization changes: 
>    i) Traverse the physical operator tree and divide it into Fragments.
>    ii) Based on the affinity of the sending Exchange, set parallelization dependencies between fragments. 
>    iii) Start parallelizing from the leaf fragments (fragment that have no other fragments depending on them for parallelization info). Stats collection include collecting parallelization info (minWidth, maxWidth, affinityMap) and cost.
> 3. Change the Receiver to accept set of (minorFragmentId, DrillbitEndpoint) as sender list. This also involved few changes in DataCollector.
> 4. Change SingleSender to accept custom minorFragmentId instead of default minorFragmentId of 0
> 
> 
> Diffs
> -----
> 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java df31f74 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java 73280ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java 5d0d9bf 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java 7be7f20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java 23860a3 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/HasAffinity.java 52462db 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java 0c67770 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Store.java 94411ea 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java 73a1d20 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java f62d922 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java fac374b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java f5dca1a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MuxExchange.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java 8e1526a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java 58c8e29 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java 2914112 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java 4a11a51 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java cfc21ac 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java 3a4dd0e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java 6db9f4a 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java ac63bde 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java 8756e5b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java 961b603 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/ParallelizationInfo.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java 8cc6c85 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java 434cdd4 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Stats.java eda364b 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java 41ff678 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java 86b395e 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MuxExchangePrel.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java faa8546 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/InsertLocalExchangeVisitor.java PRE-CREATION 
>   exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java 79603eb 
>   exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java f20627d 
>   exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorePOP.java 4c12d57 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java c83106c 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java dc016be 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b0206f7 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/MergingCollector.java ce14260 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/PartitionedCollector.java 5190d84 
>   exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b33042b 
>   exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java PRE-CREATION 
>   exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java 6349b76 
> 
> Diff: https://reviews.apache.org/r/30466/diff/
> 
> 
> Testing
> -------
> 
> Ran Functional and TPCH SF100 parquet verification tests.
> 
> 
> Thanks,
> 
> Venki Korukanti
> 
>