You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@tajo.apache.org by Min Zhou <co...@gmail.com> on 2014/02/13 03:00:12 UTC

Question about disk-aware scheduling in tajo

Hi all,

Tajo leverages the feature supported by HDFS-3672, which exposes the disk
volume id of each hdfs data block.  I already found the related code in
DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
me?  What the scheduler do when the hdfs read is a remote read on the other
machine's disk?


Thanks,
Min
-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com

Re: Question about disk-aware scheduling in tajo

Posted by jinho kim <jh...@gruter.com>.
Hi Min,

Here are my comments
If you have any questions, please let me know

Thanks.


> Hi Jihoon,
> 
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
> 
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
> 
> 
> public static class TaskBlockLocation {
>    // This is a mapping from diskId to a list of pending task, right?
>    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
Yes, this is a list of pending task of a host. (initial time)

hostA(TaskBlockLocation) ------- Disk1 --------Task1(HDFS replecation-1), 2
                                                     |
                                                 Disk2 --------Task3,4


hostB -------------------------- Disk1 --------Task1(HDFS replecation-2), 3
                                                     |
                                                 Disk2 --------Task2,4


>        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>   // How can I return a Task to the container according to the diskId?
>    private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();

When available container(unique per host) are request a task, a container assign lowest concurrency disk.(runtime)

>    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>    private String host;
> 
>    public TaskBlockLocation(String host){
>      this.host = host;
>    }
> 
>    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
>      if (list == null) {
>        list = new LinkedList<QueryUnitAttemptId>();
>        unAssignedTaskMap.put(volumeId, list);
>      }
>      list.add(attemptId);
> 
>      if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>    }
> 
>    public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>      Integer volumeId;
> 
>      if (!assignedContainerMap.containsKey(containerId)) {
>        // assign a new container to a volume with the lowest concurrency,
> right?
>        volumeId = assignVolumeId();

Yes, it is

>        assignedContainerMap.put(containerId, volumeId);
>      } else {
>        volumeId = assignedContainerMap.get(containerId);
>      }
> 
>      LinkedList<QueryUnitAttemptId> list = null;
>      if (unAssignedTaskMap.size() >  0) {
>        int retry = unAssignedTaskMap.size();
>        do {
>          list = unAssignedTaskMap.get(volumeId);
>          if (list == null || list.size() == 0) {
>            //clean and reassign remaining volume
>            unAssignedTaskMap.remove(volumeId);
>            volumeUsageMap.remove(volumeId);
>            if (volumeId < 0) break; //  processed all block on disk
> 
>            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>            volumeId = assignVolumeId();
>            // WHY THIS LINE PUT AGAIN?
>            // if the container is a new container, does it put twice??
>            assignedContainerMap.put(containerId, volumeId);

Case 1 :
	disks greater than containers. 
        ex)    host ------- container1 ---------- disk1 ---- tasks
                                                                      |
                                                                   disk2 ---- tasks
 
Case 2 :
	This is unknown disk(-1) for the remote task. because all local block is processed on disk
	if container is not assigned a remote task, end of all tasks will be deadlock
        ex)
		hostA ------- container1 ---------- disk1( new assign -1) ---- remote pending tasks(zero local pending task)
                                        |                             
				    container2 ---------- disk2 ---- tasks


		hostB ------- container1 ---------- disk1 ---- tasks (will decreased pending task)

--Jinho

2014. 2. 13., 오후 5:29, Min Zhou 작성:

> Hi Jihoon,
> 
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
> 
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
> 
> 
> public static class TaskBlockLocation {
>    // This is a mapping from diskId to a list of pending task, right?
>    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
>        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>   // How can I return a Task to the container according to the diskId?
>    private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
>    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>    private String host;
> 
>    public TaskBlockLocation(String host){
>      this.host = host;
>    }
> 
>    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
>      if (list == null) {
>        list = new LinkedList<QueryUnitAttemptId>();
>        unAssignedTaskMap.put(volumeId, list);
>      }
>      list.add(attemptId);
> 
>      if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>    }
> 
>    public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>      Integer volumeId;
> 
>      if (!assignedContainerMap.containsKey(containerId)) {
>        // assign a new container to a volume with the lowest concurrency,
> right?
>        volumeId = assignVolumeId();
>        assignedContainerMap.put(containerId, volumeId);
>      } else {
>        volumeId = assignedContainerMap.get(containerId);
>      }
> 
>      LinkedList<QueryUnitAttemptId> list = null;
>      if (unAssignedTaskMap.size() >  0) {
>        int retry = unAssignedTaskMap.size();
>        do {
>          list = unAssignedTaskMap.get(volumeId);
>          if (list == null || list.size() == 0) {
>            //clean and reassign remaining volume
>            unAssignedTaskMap.remove(volumeId);
>            volumeUsageMap.remove(volumeId);
>            if (volumeId < 0) break; //  processed all block on disk
> 
>            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>            volumeId = assignVolumeId();
>            // WHY THIS LINE PUT AGAIN?
>            // if the container is a new container, does it put twice??
>            assignedContainerMap.put(containerId, volumeId);
>            retry--;
>          } else {
>            break;
>          }
>        } while (retry > 0);
>      }
>      return list;
>    }
> 
>    public Integer assignVolumeId(){
>      Map.Entry<Integer, Integer> volumeEntry = null;
> 
>      // choose a volume with the lowest concurrency, right?
>      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>        if(volumeEntry == null) volumeEntry = entry;
> 
>        if (volumeEntry.getValue() >= entry.getValue()) {
>          volumeEntry = entry;
>        }
>      }
> 
>      if(volumeEntry != null){
>        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
> 1);
>        LOG.info("Assigned host : " + host + " Volume : " +
> volumeEntry.getKey() + ", Concurrency : "
>            + volumeUsageMap.get(volumeEntry.getKey()));
>        return volumeEntry.getKey();
>      } else {
>         return -1;  // processed all block on disk
>      }
>    }
> 
>    public String getHost() {
>      return host;
>    }
>  }
> 
> This class maintains a mapping (assignedContainerMap) from containerId to
> the assigned diskId, How can I retrieve a task based on the diskId to the
> container?
> 
> 
> Thanks,
> Min
> 
> 
> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <ji...@apache.org> wrote:
> 
>> Hi, Min.
>> 
>> In DefaultTaskScheduler, each container is mapped to each disk of all nodes
>> in a cluster. When a container requests a task, DefaultTaskScheduler
>> selects a closest task and assigns it to the container. This process works
>> for only the local reads. The disk volume information is not considered for
>> remote reads.
>> 
>> In my opinion, this is enough for us because there are few remote tasks in
>> each sub query. From a test on an in-house cluster composed of 32 nodes,
>> the ratio of remote tasks to whole tasks was only about 0.17% (The query
>> was 'select l_orderkey from lineitem', and the volume of the lineitem table
>> was about 1TB.). Since the number of tasks was very small, there were small
>> disk contentions.
>> 
>> Hope that answers your questions.
>> Thanks,
>> Jihoon
>> 
>> 2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:
>> 
>>> Hi all,
>>> 
>>> Tajo leverages the feature supported by HDFS-3672, which exposes the disk
>>> volume id of each hdfs data block.  I already found the related code in
>>> DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
>>> me?  What the scheduler do when the hdfs read is a remote read on the
>>> other
>>> machine's disk?
>>> 
>>> 
>>> Thanks,
>>> Min
>>> --
>>> My research interests are distributed systems, parallel computing and
>>> bytecode based virtual machine.
>>> 
>>> My profile:
>>> http://www.linkedin.com/in/coderplay
>>> My blog:
>>> http://coderplay.javaeye.com
>>> 
>> 
> 
> 
> 
> -- 
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
> 
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com



Re: Question about disk-aware scheduling in tajo

Posted by Henry Saputra <he...@gmail.com>.
Cool, thx Hyunsik

On Wednesday, February 19, 2014, Hyunsik Choi <hy...@apache.org> wrote:

> Henry,
>
> Thank you for your comment :)
>
> In my opinion, this explanation is too specific, and the comments on the
> source code is more accessible for this kind of explanation So, now, I'll
> add this explanation on the source code as comments. Later, I'll try to
> create some design documentations.
>
> Thanks,
> Hyunsik Choi
>
>
> On Wed, Feb 19, 2014 at 10:44 AM, Henry Saputra <henry.saputra@gmail.com<javascript:;>
> >wrote:
>
> > Hyunsik,
> >
> > I am +1 this is worthy of a wiki page or separate page to explain the
> > technical flow. Tracing the flow in the code is confusing.
> >
> > Maybe similar to the doc like in YARN [1] page (the more details the
> > better ^_^)
> >
> >
> > - Henry
> >
> > [1]
> >
> http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html
> >
> > On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <hy...@apache.org>
> wrote:
> > > Hi Min,
> > >
> > > Above all, I'm very sorry for lack of documentations on the source
> code.
> > So
> > > far, we have developed Tajo with insufficient documentations by only
> > > pursuiting a quick and dirty manner. We should fill more documentations
> > on
> > > the source code.
> > >
> > > I'm going to explain how Tajo uses disk volume locality. Before this
> > > explanation, I would like to explain the node locality that you may
> > already
> > > know. Similar to MR, Tajo also uses three level locality for each task.
> > For
> > > each task, the scheduler finds local node, closest rack, and random
> node
> > > sequentially. In Tajo, the scheduler additionally finds the local
> volume
> > > prior to finding the local node.
> > >
> > > The important thing is that we don't need to aware of actual disk
> volume
> > > IDs in each local node, and we just assigne disk volumes to TaskRunners
> > in
> > > a node in a round robin manner. It would be sufficient to improve the
> > load
> > > balancing by considering disk volume.
> > >
> > > Initially, TaskRunners are not mapped to disk volumes in each worker.
> The
> > > mapping occurs dynamically in the scheduler. For example, there are 6
> > local
> > > tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
> > > three TaskRunners (T1, T2, and T3) will be running on the node N1.
> > >
> > > When tasks are added to the scheduler, the scheduler gets the disk
> volume
> > > id from each task. As you know, each volume is just an integer which is
> > > just a logical identifier for just distinguishing different disk
> volumes.
> > > Then, the scheduler builds a map between disk volume ids (obtained from
> > > BlockStorageLocation) in each node and a list of tasks
> > > (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each
> entry
> > > in the map consists of one disk volume id and a list of tasks
> > corresponding
> > > to the disk volume.
> > >
> > > When the first task is requested from a TaskRunner T1 in node N1, the
> > > scheduler just assignes the first disk volume v1 to T1, and then it
> > > schedules one task which belongs to the disk volume v1. Later, a task
> is
> > > requested from a different TaskRunner T2 from node N1, the schedules
> > > assignes the second disk volume v2 to T2, and then it schedules a task
> > > which belongs to the disk volume v2.  Also, a task request is given
> from
> > T1
> > > again, the scheduler schedules one task in the disk volume v1 to T1
> > because
> > > T1 is already mapped to v1.
> > >
> > > Like MR, Tajo uses a dynamic scheduling, and it works very well in the
> > > environments where each node has different performance disks. If you
> have
> > > additional question, please feel free to ask.
> > >
> > > Also, I'll create a Jira issue to add this explain to
> > DefaultTaskScheduler.
> > >
> > > - hyunsik
> > >
> > >
> > >
> > > On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <co...@gmail.com> wrote:
> > >
> > >> Hi Jihoon,
> > >>
> > >> Thank you for you answer. However, seem you didn't answer that how
> tajo
> > use
> > >> disk information to balance the io overhead.
> > >>
> > >> And still can't understand the details,  quite complex to me,
> especially
> > >> the class TaskBlockLocation
> > >>
> > >>
> > >> public static class TaskBlockLocation {
> > >>     // This is a mapping from diskId to a list of pending task, right?
> > >>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>

Re: Question about disk-aware scheduling in tajo

Posted by Hyunsik Choi <hy...@apache.org>.
Henry,

Thank you for your comment :)

In my opinion, this explanation is too specific, and the comments on the
source code is more accessible for this kind of explanation So, now, I'll
add this explanation on the source code as comments. Later, I'll try to
create some design documentations.

Thanks,
Hyunsik Choi


On Wed, Feb 19, 2014 at 10:44 AM, Henry Saputra <he...@gmail.com>wrote:

> Hyunsik,
>
> I am +1 this is worthy of a wiki page or separate page to explain the
> technical flow. Tracing the flow in the code is confusing.
>
> Maybe similar to the doc like in YARN [1] page (the more details the
> better ^_^)
>
>
> - Henry
>
> [1]
> http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html
>
> On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <hy...@apache.org> wrote:
> > Hi Min,
> >
> > Above all, I'm very sorry for lack of documentations on the source code.
> So
> > far, we have developed Tajo with insufficient documentations by only
> > pursuiting a quick and dirty manner. We should fill more documentations
> on
> > the source code.
> >
> > I'm going to explain how Tajo uses disk volume locality. Before this
> > explanation, I would like to explain the node locality that you may
> already
> > know. Similar to MR, Tajo also uses three level locality for each task.
> For
> > each task, the scheduler finds local node, closest rack, and random node
> > sequentially. In Tajo, the scheduler additionally finds the local volume
> > prior to finding the local node.
> >
> > The important thing is that we don't need to aware of actual disk volume
> > IDs in each local node, and we just assigne disk volumes to TaskRunners
> in
> > a node in a round robin manner. It would be sufficient to improve the
> load
> > balancing by considering disk volume.
> >
> > Initially, TaskRunners are not mapped to disk volumes in each worker. The
> > mapping occurs dynamically in the scheduler. For example, there are 6
> local
> > tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
> > three TaskRunners (T1, T2, and T3) will be running on the node N1.
> >
> > When tasks are added to the scheduler, the scheduler gets the disk volume
> > id from each task. As you know, each volume is just an integer which is
> > just a logical identifier for just distinguishing different disk volumes.
> > Then, the scheduler builds a map between disk volume ids (obtained from
> > BlockStorageLocation) in each node and a list of tasks
> > (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
> > in the map consists of one disk volume id and a list of tasks
> corresponding
> > to the disk volume.
> >
> > When the first task is requested from a TaskRunner T1 in node N1, the
> > scheduler just assignes the first disk volume v1 to T1, and then it
> > schedules one task which belongs to the disk volume v1. Later, a task is
> > requested from a different TaskRunner T2 from node N1, the schedules
> > assignes the second disk volume v2 to T2, and then it schedules a task
> > which belongs to the disk volume v2.  Also, a task request is given from
> T1
> > again, the scheduler schedules one task in the disk volume v1 to T1
> because
> > T1 is already mapped to v1.
> >
> > Like MR, Tajo uses a dynamic scheduling, and it works very well in the
> > environments where each node has different performance disks. If you have
> > additional question, please feel free to ask.
> >
> > Also, I'll create a Jira issue to add this explain to
> DefaultTaskScheduler.
> >
> > - hyunsik
> >
> >
> >
> > On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <co...@gmail.com> wrote:
> >
> >> Hi Jihoon,
> >>
> >> Thank you for you answer. However, seem you didn't answer that how tajo
> use
> >> disk information to balance the io overhead.
> >>
> >> And still can't understand the details,  quite complex to me, especially
> >> the class TaskBlockLocation
> >>
> >>
> >> public static class TaskBlockLocation {
> >>     // This is a mapping from diskId to a list of pending task, right?
> >>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> >> unAssignedTaskMap =
> >>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
> >>    // How can I return a Task to the container according to the diskId?
> >>     private HashMap<ContainerId, Integer> assignedContainerMap = new
> >> HashMap<ContainerId, Integer>();
> >>     private TreeMap<Integer, Integer> volumeUsageMap = new
> TreeMap<Integer,
> >> Integer>();
> >>     private String host;
> >>
> >>     public TaskBlockLocation(String host){
> >>       this.host = host;
> >>     }
> >>
> >>     public void addQueryUnitAttemptId(Integer volumeId,
> QueryUnitAttemptId
> >> attemptId){
> >>       LinkedList<QueryUnitAttemptId> list =
> >> unAssignedTaskMap.get(volumeId);
> >>       if (list == null) {
> >>         list = new LinkedList<QueryUnitAttemptId>();
> >>         unAssignedTaskMap.put(volumeId, list);
> >>       }
> >>       list.add(attemptId);
> >>
> >>       if(!volumeUsageMap.containsKey(volumeId))
> >> volumeUsageMap.put(volumeId, 0);
> >>     }
> >>
> >>     public LinkedList<QueryUnitAttemptId>
> >> getQueryUnitAttemptIdList(ContainerId containerId){
> >>       Integer volumeId;
> >>
> >>       if (!assignedContainerMap.containsKey(containerId)) {
> >>         // assign a new container to a volume with the lowest
> concurrency,
> >> right?
> >>         volumeId = assignVolumeId();
> >>         assignedContainerMap.put(containerId, volumeId);
> >>       } else {
> >>         volumeId = assignedContainerMap.get(containerId);
> >>       }
> >>
> >>       LinkedList<QueryUnitAttemptId> list = null;
> >>       if (unAssignedTaskMap.size() >  0) {
> >>         int retry = unAssignedTaskMap.size();
> >>         do {
> >>           list = unAssignedTaskMap.get(volumeId);
> >>           if (list == null || list.size() == 0) {
> >>             //clean and reassign remaining volume
> >>             unAssignedTaskMap.remove(volumeId);
> >>             volumeUsageMap.remove(volumeId);
> >>             if (volumeId < 0) break; //  processed all block on disk
> >>
> >>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
> >>             volumeId = assignVolumeId();
> >>             // WHY THIS LINE PUT AGAIN?
> >>             // if the container is a new container, does it put twice??
> >>             assignedContainerMap.put(containerId, volumeId);
> >>             retry--;
> >>           } else {
> >>             break;
> >>           }
> >>         } while (retry > 0);
> >>       }
> >>       return list;
> >>     }
> >>
> >>     public Integer assignVolumeId(){
> >>       Map.Entry<Integer, Integer> volumeEntry = null;
> >>
> >>       // choose a volume with the lowest concurrency, right?
> >>       for (Map.Entry<Integer, Integer> entry :
> volumeUsageMap.entrySet()) {
> >>         if(volumeEntry == null) volumeEntry = entry;
> >>
> >>         if (volumeEntry.getValue() >= entry.getValue()) {
> >>           volumeEntry = entry;
> >>         }
> >>       }
> >>
> >>       if(volumeEntry != null){
> >>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue()
> +
> >> 1);
> >>         LOG.info("Assigned host : " + host + " Volume : " +
> >> volumeEntry.getKey() + ", Concurrency : "
> >>             + volumeUsageMap.get(volumeEntry.getKey()));
> >>         return volumeEntry.getKey();
> >>       } else {
> >>          return -1;  // processed all block on disk
> >>       }
> >>     }
> >>
> >>     public String getHost() {
> >>       return host;
> >>     }
> >>   }
> >>
> >> This class maintains a mapping (assignedContainerMap) from containerId
> to
> >> the assigned diskId, How can I retrieve a task based on the diskId to
> the
> >> container?
> >>
> >>
> >> Thanks,
> >> Min
> >>
> >>
> >> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <ji...@apache.org>
> wrote:
> >>
> >> > Hi, Min.
> >> >
> >> > In DefaultTaskScheduler, each container is mapped to each disk of all
> >> nodes
> >> > in a cluster. When a container requests a task, DefaultTaskScheduler
> >> > selects a closest task and assigns it to the container. This process
> >> works
> >> > for only the local reads. The disk volume information is not
> considered
> >> for
> >> > remote reads.
> >> >
> >> > In my opinion, this is enough for us because there are few remote
> tasks
> >> in
> >> > each sub query. From a test on an in-house cluster composed of 32
> nodes,
> >> > the ratio of remote tasks to whole tasks was only about 0.17% (The
> query
> >> > was 'select l_orderkey from lineitem', and the volume of the lineitem
> >> table
> >> > was about 1TB.). Since the number of tasks was very small, there were
> >> small
> >> > disk contentions.
> >> >
> >> > Hope that answers your questions.
> >> > Thanks,
> >> > Jihoon
> >> >
> >> > 2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:
> >> >
> >> > > Hi all,
> >> > >
> >> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
> >> disk
> >> > > volume id of each hdfs data block.  I already found the related
> code in
> >> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the
> logic
> >> for
> >> > > me?  What the scheduler do when the hdfs read is a remote read on
> the
> >> > > other
> >> > > machine's disk?
> >> > >
> >> > >
> >> > > Thanks,
> >> > > Min
> >> > > --
> >> > > My research interests are distributed systems, parallel computing
> and
> >> > > bytecode based virtual machine.
> >> > >
> >> > > My profile:
> >> > > http://www.linkedin.com/in/coderplay
> >> > > My blog:
> >> > > http://coderplay.javaeye.com
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> My research interests are distributed systems, parallel computing and
> >> bytecode based virtual machine.
> >>
> >> My profile:
> >> http://www.linkedin.com/in/coderplay
> >> My blog:
> >> http://coderplay.javaeye.com
> >>
>

Re: Question about disk-aware scheduling in tajo

Posted by Henry Saputra <he...@gmail.com>.
Hyunsik,

I am +1 this is worthy of a wiki page or separate page to explain the
technical flow. Tracing the flow in the code is confusing.

Maybe similar to the doc like in YARN [1] page (the more details the better ^_^)


- Henry

[1] http://hadoop.apache.org/docs/current2/hadoop-yarn/hadoop-yarn-site/YARN.html

On Thu, Feb 13, 2014 at 2:18 AM, Hyunsik Choi <hy...@apache.org> wrote:
> Hi Min,
>
> Above all, I'm very sorry for lack of documentations on the source code. So
> far, we have developed Tajo with insufficient documentations by only
> pursuiting a quick and dirty manner. We should fill more documentations on
> the source code.
>
> I'm going to explain how Tajo uses disk volume locality. Before this
> explanation, I would like to explain the node locality that you may already
> know. Similar to MR, Tajo also uses three level locality for each task. For
> each task, the scheduler finds local node, closest rack, and random node
> sequentially. In Tajo, the scheduler additionally finds the local volume
> prior to finding the local node.
>
> The important thing is that we don't need to aware of actual disk volume
> IDs in each local node, and we just assigne disk volumes to TaskRunners in
> a node in a round robin manner. It would be sufficient to improve the load
> balancing by considering disk volume.
>
> Initially, TaskRunners are not mapped to disk volumes in each worker. The
> mapping occurs dynamically in the scheduler. For example, there are 6 local
> tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
> three TaskRunners (T1, T2, and T3) will be running on the node N1.
>
> When tasks are added to the scheduler, the scheduler gets the disk volume
> id from each task. As you know, each volume is just an integer which is
> just a logical identifier for just distinguishing different disk volumes.
> Then, the scheduler builds a map between disk volume ids (obtained from
> BlockStorageLocation) in each node and a list of tasks
> (DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
> in the map consists of one disk volume id and a list of tasks corresponding
> to the disk volume.
>
> When the first task is requested from a TaskRunner T1 in node N1, the
> scheduler just assignes the first disk volume v1 to T1, and then it
> schedules one task which belongs to the disk volume v1. Later, a task is
> requested from a different TaskRunner T2 from node N1, the schedules
> assignes the second disk volume v2 to T2, and then it schedules a task
> which belongs to the disk volume v2.  Also, a task request is given from T1
> again, the scheduler schedules one task in the disk volume v1 to T1 because
> T1 is already mapped to v1.
>
> Like MR, Tajo uses a dynamic scheduling, and it works very well in the
> environments where each node has different performance disks. If you have
> additional question, please feel free to ask.
>
> Also, I'll create a Jira issue to add this explain to DefaultTaskScheduler.
>
> - hyunsik
>
>
>
> On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <co...@gmail.com> wrote:
>
>> Hi Jihoon,
>>
>> Thank you for you answer. However, seem you didn't answer that how tajo use
>> disk information to balance the io overhead.
>>
>> And still can't understand the details,  quite complex to me, especially
>> the class TaskBlockLocation
>>
>>
>> public static class TaskBlockLocation {
>>     // This is a mapping from diskId to a list of pending task, right?
>>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
>> unAssignedTaskMap =
>>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>>    // How can I return a Task to the container according to the diskId?
>>     private HashMap<ContainerId, Integer> assignedContainerMap = new
>> HashMap<ContainerId, Integer>();
>>     private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
>> Integer>();
>>     private String host;
>>
>>     public TaskBlockLocation(String host){
>>       this.host = host;
>>     }
>>
>>     public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
>> attemptId){
>>       LinkedList<QueryUnitAttemptId> list =
>> unAssignedTaskMap.get(volumeId);
>>       if (list == null) {
>>         list = new LinkedList<QueryUnitAttemptId>();
>>         unAssignedTaskMap.put(volumeId, list);
>>       }
>>       list.add(attemptId);
>>
>>       if(!volumeUsageMap.containsKey(volumeId))
>> volumeUsageMap.put(volumeId, 0);
>>     }
>>
>>     public LinkedList<QueryUnitAttemptId>
>> getQueryUnitAttemptIdList(ContainerId containerId){
>>       Integer volumeId;
>>
>>       if (!assignedContainerMap.containsKey(containerId)) {
>>         // assign a new container to a volume with the lowest concurrency,
>> right?
>>         volumeId = assignVolumeId();
>>         assignedContainerMap.put(containerId, volumeId);
>>       } else {
>>         volumeId = assignedContainerMap.get(containerId);
>>       }
>>
>>       LinkedList<QueryUnitAttemptId> list = null;
>>       if (unAssignedTaskMap.size() >  0) {
>>         int retry = unAssignedTaskMap.size();
>>         do {
>>           list = unAssignedTaskMap.get(volumeId);
>>           if (list == null || list.size() == 0) {
>>             //clean and reassign remaining volume
>>             unAssignedTaskMap.remove(volumeId);
>>             volumeUsageMap.remove(volumeId);
>>             if (volumeId < 0) break; //  processed all block on disk
>>
>>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>>             volumeId = assignVolumeId();
>>             // WHY THIS LINE PUT AGAIN?
>>             // if the container is a new container, does it put twice??
>>             assignedContainerMap.put(containerId, volumeId);
>>             retry--;
>>           } else {
>>             break;
>>           }
>>         } while (retry > 0);
>>       }
>>       return list;
>>     }
>>
>>     public Integer assignVolumeId(){
>>       Map.Entry<Integer, Integer> volumeEntry = null;
>>
>>       // choose a volume with the lowest concurrency, right?
>>       for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>>         if(volumeEntry == null) volumeEntry = entry;
>>
>>         if (volumeEntry.getValue() >= entry.getValue()) {
>>           volumeEntry = entry;
>>         }
>>       }
>>
>>       if(volumeEntry != null){
>>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
>> 1);
>>         LOG.info("Assigned host : " + host + " Volume : " +
>> volumeEntry.getKey() + ", Concurrency : "
>>             + volumeUsageMap.get(volumeEntry.getKey()));
>>         return volumeEntry.getKey();
>>       } else {
>>          return -1;  // processed all block on disk
>>       }
>>     }
>>
>>     public String getHost() {
>>       return host;
>>     }
>>   }
>>
>> This class maintains a mapping (assignedContainerMap) from containerId to
>> the assigned diskId, How can I retrieve a task based on the diskId to the
>> container?
>>
>>
>> Thanks,
>> Min
>>
>>
>> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <ji...@apache.org> wrote:
>>
>> > Hi, Min.
>> >
>> > In DefaultTaskScheduler, each container is mapped to each disk of all
>> nodes
>> > in a cluster. When a container requests a task, DefaultTaskScheduler
>> > selects a closest task and assigns it to the container. This process
>> works
>> > for only the local reads. The disk volume information is not considered
>> for
>> > remote reads.
>> >
>> > In my opinion, this is enough for us because there are few remote tasks
>> in
>> > each sub query. From a test on an in-house cluster composed of 32 nodes,
>> > the ratio of remote tasks to whole tasks was only about 0.17% (The query
>> > was 'select l_orderkey from lineitem', and the volume of the lineitem
>> table
>> > was about 1TB.). Since the number of tasks was very small, there were
>> small
>> > disk contentions.
>> >
>> > Hope that answers your questions.
>> > Thanks,
>> > Jihoon
>> >
>> > 2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:
>> >
>> > > Hi all,
>> > >
>> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
>> disk
>> > > volume id of each hdfs data block.  I already found the related code in
>> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic
>> for
>> > > me?  What the scheduler do when the hdfs read is a remote read on the
>> > > other
>> > > machine's disk?
>> > >
>> > >
>> > > Thanks,
>> > > Min
>> > > --
>> > > My research interests are distributed systems, parallel computing and
>> > > bytecode based virtual machine.
>> > >
>> > > My profile:
>> > > http://www.linkedin.com/in/coderplay
>> > > My blog:
>> > > http://coderplay.javaeye.com
>> > >
>> >
>>
>>
>>
>> --
>> My research interests are distributed systems, parallel computing and
>> bytecode based virtual machine.
>>
>> My profile:
>> http://www.linkedin.com/in/coderplay
>> My blog:
>> http://coderplay.javaeye.com
>>

Re: Question about disk-aware scheduling in tajo

Posted by Hyunsik Choi <hy...@apache.org>.
Hi Min,

Above all, I'm very sorry for lack of documentations on the source code. So
far, we have developed Tajo with insufficient documentations by only
pursuiting a quick and dirty manner. We should fill more documentations on
the source code.

I'm going to explain how Tajo uses disk volume locality. Before this
explanation, I would like to explain the node locality that you may already
know. Similar to MR, Tajo also uses three level locality for each task. For
each task, the scheduler finds local node, closest rack, and random node
sequentially. In Tajo, the scheduler additionally finds the local volume
prior to finding the local node.

The important thing is that we don't need to aware of actual disk volume
IDs in each local node, and we just assigne disk volumes to TaskRunners in
a node in a round robin manner. It would be sufficient to improve the load
balancing by considering disk volume.

Initially, TaskRunners are not mapped to disk volumes in each worker. The
mapping occurs dynamically in the scheduler. For example, there are 6 local
tasks for some node N1 which has 3 disk volumes (v1, v2, and v3). Also,
three TaskRunners (T1, T2, and T3) will be running on the node N1.

When tasks are added to the scheduler, the scheduler gets the disk volume
id from each task. As you know, each volume is just an integer which is
just a logical identifier for just distinguishing different disk volumes.
Then, the scheduler builds a map between disk volume ids (obtained from
BlockStorageLocation) in each node and a list of tasks
(DefaultTaskScheduler::addQueryUnitAttemptId). In other words, each entry
in the map consists of one disk volume id and a list of tasks corresponding
to the disk volume.

When the first task is requested from a TaskRunner T1 in node N1, the
scheduler just assignes the first disk volume v1 to T1, and then it
schedules one task which belongs to the disk volume v1. Later, a task is
requested from a different TaskRunner T2 from node N1, the schedules
assignes the second disk volume v2 to T2, and then it schedules a task
which belongs to the disk volume v2.  Also, a task request is given from T1
again, the scheduler schedules one task in the disk volume v1 to T1 because
T1 is already mapped to v1.

Like MR, Tajo uses a dynamic scheduling, and it works very well in the
environments where each node has different performance disks. If you have
additional question, please feel free to ask.

Also, I'll create a Jira issue to add this explain to DefaultTaskScheduler.

- hyunsik



On Thu, Feb 13, 2014 at 5:29 PM, Min Zhou <co...@gmail.com> wrote:

> Hi Jihoon,
>
> Thank you for you answer. However, seem you didn't answer that how tajo use
> disk information to balance the io overhead.
>
> And still can't understand the details,  quite complex to me, especially
> the class TaskBlockLocation
>
>
> public static class TaskBlockLocation {
>     // This is a mapping from diskId to a list of pending task, right?
>     private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
> unAssignedTaskMap =
>         new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
>    // How can I return a Task to the container according to the diskId?
>     private HashMap<ContainerId, Integer> assignedContainerMap = new
> HashMap<ContainerId, Integer>();
>     private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
> Integer>();
>     private String host;
>
>     public TaskBlockLocation(String host){
>       this.host = host;
>     }
>
>     public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
> attemptId){
>       LinkedList<QueryUnitAttemptId> list =
> unAssignedTaskMap.get(volumeId);
>       if (list == null) {
>         list = new LinkedList<QueryUnitAttemptId>();
>         unAssignedTaskMap.put(volumeId, list);
>       }
>       list.add(attemptId);
>
>       if(!volumeUsageMap.containsKey(volumeId))
> volumeUsageMap.put(volumeId, 0);
>     }
>
>     public LinkedList<QueryUnitAttemptId>
> getQueryUnitAttemptIdList(ContainerId containerId){
>       Integer volumeId;
>
>       if (!assignedContainerMap.containsKey(containerId)) {
>         // assign a new container to a volume with the lowest concurrency,
> right?
>         volumeId = assignVolumeId();
>         assignedContainerMap.put(containerId, volumeId);
>       } else {
>         volumeId = assignedContainerMap.get(containerId);
>       }
>
>       LinkedList<QueryUnitAttemptId> list = null;
>       if (unAssignedTaskMap.size() >  0) {
>         int retry = unAssignedTaskMap.size();
>         do {
>           list = unAssignedTaskMap.get(volumeId);
>           if (list == null || list.size() == 0) {
>             //clean and reassign remaining volume
>             unAssignedTaskMap.remove(volumeId);
>             volumeUsageMap.remove(volumeId);
>             if (volumeId < 0) break; //  processed all block on disk
>
>             // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
>             volumeId = assignVolumeId();
>             // WHY THIS LINE PUT AGAIN?
>             // if the container is a new container, does it put twice??
>             assignedContainerMap.put(containerId, volumeId);
>             retry--;
>           } else {
>             break;
>           }
>         } while (retry > 0);
>       }
>       return list;
>     }
>
>     public Integer assignVolumeId(){
>       Map.Entry<Integer, Integer> volumeEntry = null;
>
>       // choose a volume with the lowest concurrency, right?
>       for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
>         if(volumeEntry == null) volumeEntry = entry;
>
>         if (volumeEntry.getValue() >= entry.getValue()) {
>           volumeEntry = entry;
>         }
>       }
>
>       if(volumeEntry != null){
>         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
> 1);
>         LOG.info("Assigned host : " + host + " Volume : " +
> volumeEntry.getKey() + ", Concurrency : "
>             + volumeUsageMap.get(volumeEntry.getKey()));
>         return volumeEntry.getKey();
>       } else {
>          return -1;  // processed all block on disk
>       }
>     }
>
>     public String getHost() {
>       return host;
>     }
>   }
>
> This class maintains a mapping (assignedContainerMap) from containerId to
> the assigned diskId, How can I retrieve a task based on the diskId to the
> container?
>
>
> Thanks,
> Min
>
>
> On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <ji...@apache.org> wrote:
>
> > Hi, Min.
> >
> > In DefaultTaskScheduler, each container is mapped to each disk of all
> nodes
> > in a cluster. When a container requests a task, DefaultTaskScheduler
> > selects a closest task and assigns it to the container. This process
> works
> > for only the local reads. The disk volume information is not considered
> for
> > remote reads.
> >
> > In my opinion, this is enough for us because there are few remote tasks
> in
> > each sub query. From a test on an in-house cluster composed of 32 nodes,
> > the ratio of remote tasks to whole tasks was only about 0.17% (The query
> > was 'select l_orderkey from lineitem', and the volume of the lineitem
> table
> > was about 1TB.). Since the number of tasks was very small, there were
> small
> > disk contentions.
> >
> > Hope that answers your questions.
> > Thanks,
> > Jihoon
> >
> > 2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:
> >
> > > Hi all,
> > >
> > > Tajo leverages the feature supported by HDFS-3672, which exposes the
> disk
> > > volume id of each hdfs data block.  I already found the related code in
> > > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic
> for
> > > me?  What the scheduler do when the hdfs read is a remote read on the
> > > other
> > > machine's disk?
> > >
> > >
> > > Thanks,
> > > Min
> > > --
> > > My research interests are distributed systems, parallel computing and
> > > bytecode based virtual machine.
> > >
> > > My profile:
> > > http://www.linkedin.com/in/coderplay
> > > My blog:
> > > http://coderplay.javaeye.com
> > >
> >
>
>
>
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>

Re: Question about disk-aware scheduling in tajo

Posted by Min Zhou <co...@gmail.com>.
Hi Jihoon,

Thank you for you answer. However, seem you didn't answer that how tajo use
disk information to balance the io overhead.

And still can't understand the details,  quite complex to me, especially
the class TaskBlockLocation


public static class TaskBlockLocation {
    // This is a mapping from diskId to a list of pending task, right?
    private HashMap<Integer, LinkedList<QueryUnitAttemptId>>
unAssignedTaskMap =
        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
   // How can I return a Task to the container according to the diskId?
    private HashMap<ContainerId, Integer> assignedContainerMap = new
HashMap<ContainerId, Integer>();
    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer,
Integer>();
    private String host;

    public TaskBlockLocation(String host){
      this.host = host;
    }

    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId
attemptId){
      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
      if (list == null) {
        list = new LinkedList<QueryUnitAttemptId>();
        unAssignedTaskMap.put(volumeId, list);
      }
      list.add(attemptId);

      if(!volumeUsageMap.containsKey(volumeId))
volumeUsageMap.put(volumeId, 0);
    }

    public LinkedList<QueryUnitAttemptId>
getQueryUnitAttemptIdList(ContainerId containerId){
      Integer volumeId;

      if (!assignedContainerMap.containsKey(containerId)) {
        // assign a new container to a volume with the lowest concurrency,
right?
        volumeId = assignVolumeId();
        assignedContainerMap.put(containerId, volumeId);
      } else {
        volumeId = assignedContainerMap.get(containerId);
      }

      LinkedList<QueryUnitAttemptId> list = null;
      if (unAssignedTaskMap.size() >  0) {
        int retry = unAssignedTaskMap.size();
        do {
          list = unAssignedTaskMap.get(volumeId);
          if (list == null || list.size() == 0) {
            //clean and reassign remaining volume
            unAssignedTaskMap.remove(volumeId);
            volumeUsageMap.remove(volumeId);
            if (volumeId < 0) break; //  processed all block on disk

            // WHY THIS LINE ASSIGN A VOLUMEID AGAIN?
            volumeId = assignVolumeId();
            // WHY THIS LINE PUT AGAIN?
            // if the container is a new container, does it put twice??
            assignedContainerMap.put(containerId, volumeId);
            retry--;
          } else {
            break;
          }
        } while (retry > 0);
      }
      return list;
    }

    public Integer assignVolumeId(){
      Map.Entry<Integer, Integer> volumeEntry = null;

      // choose a volume with the lowest concurrency, right?
      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
        if(volumeEntry == null) volumeEntry = entry;

        if (volumeEntry.getValue() >= entry.getValue()) {
          volumeEntry = entry;
        }
      }

      if(volumeEntry != null){
        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() +
1);
        LOG.info("Assigned host : " + host + " Volume : " +
volumeEntry.getKey() + ", Concurrency : "
            + volumeUsageMap.get(volumeEntry.getKey()));
        return volumeEntry.getKey();
      } else {
         return -1;  // processed all block on disk
      }
    }

    public String getHost() {
      return host;
    }
  }

This class maintains a mapping (assignedContainerMap) from containerId to
the assigned diskId, How can I retrieve a task based on the diskId to the
container?


Thanks,
Min


On Wed, Feb 12, 2014 at 10:17 PM, Jihoon Son <ji...@apache.org> wrote:

> Hi, Min.
>
> In DefaultTaskScheduler, each container is mapped to each disk of all nodes
> in a cluster. When a container requests a task, DefaultTaskScheduler
> selects a closest task and assigns it to the container. This process works
> for only the local reads. The disk volume information is not considered for
> remote reads.
>
> In my opinion, this is enough for us because there are few remote tasks in
> each sub query. From a test on an in-house cluster composed of 32 nodes,
> the ratio of remote tasks to whole tasks was only about 0.17% (The query
> was 'select l_orderkey from lineitem', and the volume of the lineitem table
> was about 1TB.). Since the number of tasks was very small, there were small
> disk contentions.
>
> Hope that answers your questions.
> Thanks,
> Jihoon
>
> 2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:
>
> > Hi all,
> >
> > Tajo leverages the feature supported by HDFS-3672, which exposes the disk
> > volume id of each hdfs data block.  I already found the related code in
> > DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
> > me?  What the scheduler do when the hdfs read is a remote read on the
> > other
> > machine's disk?
> >
> >
> > Thanks,
> > Min
> > --
> > My research interests are distributed systems, parallel computing and
> > bytecode based virtual machine.
> >
> > My profile:
> > http://www.linkedin.com/in/coderplay
> > My blog:
> > http://coderplay.javaeye.com
> >
>



-- 
My research interests are distributed systems, parallel computing and
bytecode based virtual machine.

My profile:
http://www.linkedin.com/in/coderplay
My blog:
http://coderplay.javaeye.com

Re: Question about disk-aware scheduling in tajo

Posted by Jihoon Son <ji...@apache.org>.
Hi, Min.

In DefaultTaskScheduler, each container is mapped to each disk of all nodes
in a cluster. When a container requests a task, DefaultTaskScheduler
selects a closest task and assigns it to the container. This process works
for only the local reads. The disk volume information is not considered for
remote reads.

In my opinion, this is enough for us because there are few remote tasks in
each sub query. From a test on an in-house cluster composed of 32 nodes,
the ratio of remote tasks to whole tasks was only about 0.17% (The query
was 'select l_orderkey from lineitem', and the volume of the lineitem table
was about 1TB.). Since the number of tasks was very small, there were small
disk contentions.

Hope that answers your questions.
Thanks,
Jihoon

2014-02-13 11:00 GMT+09:00 Min Zhou <co...@gmail.com>:

> Hi all,
>
> Tajo leverages the feature supported by HDFS-3672, which exposes the disk
> volume id of each hdfs data block.  I already found the related code in
> DefaultTaskScheduler.assignToLeafTasks,  can anyone explain the logic for
> me?  What the scheduler do when the hdfs read is a remote read on the
> other
> machine's disk?
>
>
> Thanks,
> Min
> --
> My research interests are distributed systems, parallel computing and
> bytecode based virtual machine.
>
> My profile:
> http://www.linkedin.com/in/coderplay
> My blog:
> http://coderplay.javaeye.com
>