You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "dhruba borthakur (JIRA)" <ji...@apache.org> on 2007/03/07 00:27:24 UTC

[jira] Created: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

DFS Scalability: high CPU usage in choosing replication targets and file open
-----------------------------------------------------------------------------

                 Key: HADOOP-1073
                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
             Project: Hadoop
          Issue Type: Bug
          Components: dfs
            Reporter: dhruba borthakur


I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 

The positives first:
1. Datanodes continue to heartbeat and there are no cascading failures.
2. chooseRandom() does not use much CPU and is very lightweight.


An analysis of the namenode shows the following:

1. High CPU usage in FSNamesystem.getPipeline().
2. Moderate CPU usage in FSNamesystem.sortByDistance().

The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.

I have two proposals to address this problem. Please comment.

Proposal 1: Optimize getDistance()
--------------
In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.

Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.

Also, the calls to check checkArgument() from getDistance() may be removed. 
Also, the call to getPipeline() may be done outside the global FSNamesystem lock.


Proposal 2: Distribute the workload to the DFSClient
---------------
The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.

If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.






-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482624 ] 

Raghu Angadi commented on HADOOP-1073:
--------------------------------------

         if( node1 == node2 || node1.equals(node2)) {
             return 0;
         }

I think we should remove equals() as well. Most real clusters will not have two data nodes on the same node.. even if someone did, usually transfer between two nodes on the same switch is just as fast as transfer from one port to another port on the same node.


> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>         Attachments: getDistance.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482260 ] 

Hairong Kuang commented on HADOOP-1073:
---------------------------------------

+1 on Raghu's suggestion. The # of calls on getDistance could be reduced in sortedByDistance.

Another improvement to chooseTarget is to remove a data node from clusterMap when the data node is being decommissioned. So a decomissioning node will not be chosen as a replication target. This idea also came from Raghu. See his comment to HADOOP-1070.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Assigned: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang reassigned HADOOP-1073:
-------------------------------------

    Assignee: Hairong Kuang

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang updated HADOOP-1073:
----------------------------------

    Attachment: getDistance2.patch

> still think we don't need to optimize uncommon case at the expense of common case.

You convinced me! Here is the new patch that reflects your suggestion.

> I doubt if there is any real benefit to copying locally compared transferring from another node on the same switch. Note that the data still needs to flow through DFSClient, for CRC etc. 

This could be done if we change the client/datanode protocol. We could have a special opCode indicating that the block content is in the local disk along with the source path.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang updated HADOOP-1073:
----------------------------------

    Attachment: getDistance.patch

This patch optimizes getDistance, contains, and isOnSameRack.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>         Attachments: getDistance.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raghu Angadi updated HADOOP-1073:
---------------------------------

    Status: Patch Available  (was: Open)

The patch applies with the latest trunk.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang updated HADOOP-1073:
----------------------------------

        Fix Version/s: 0.13.0
    Affects Version/s: 0.12.1
               Status: Patch Available  (was: Open)

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482878 ] 

Hairong Kuang commented on HADOOP-1073:
---------------------------------------

Raghu, I would rather keep equals. We do have test clusters that run two datanodes on the same machine. Although now we do data exchange through ports on the same machine, I think we could optimize it by a disk copy later on. So it is important to make a difference.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Sameer Paranjpye (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482253 ] 

Sameer Paranjpye commented on HADOOP-1073:
------------------------------------------

Do we need to sort datanodes by distance? Why not just do a linear scan for the on node and on rack instances, put them at the front of the pipeline and leave the rest in random order?

Another option would be to do a linear scan and bucket nodes by distance, sorting just seems unnecessary here. 

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang updated HADOOP-1073:
----------------------------------

    Attachment:     (was: getDistance.patch)

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hairong Kuang updated HADOOP-1073:
----------------------------------

    Attachment: getDistance.patch

This patch optimizes the performance  of getDistance by changing clusterMap data structures. I will submit a seperate patch to incorprate other suggestions.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>         Attachments: getDistance.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482230 ] 

Raghu Angadi commented on HADOOP-1073:
--------------------------------------


One more thing, I think we should calculate distance once for each replica and use the value in sort. This would cut calls to getDistance by half in the case of 3 replicas and more  in the case of more replicas. To facilitate this we could have an int in DatanodeDesscriptor that is set before the sort.



> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raghu Angadi updated HADOOP-1073:
---------------------------------

    Status: Open  (was: Patch Available)

Will resubmit after HADOOP-702 is committed.


> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Tom White (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tom White updated HADOOP-1073:
------------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

I've just committed this. Thanks Hairong!

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12481776 ] 

Raghu Angadi commented on HADOOP-1073:
--------------------------------------

+1 for proposal 1. We should have zero string operations in getDistance().. only pointer comparisions.



> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hadoop QA (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12486538 ] 

Hadoop QA commented on HADOOP-1073:
-----------------------------------

+1, because http://issues.apache.org/jira/secure/attachment/12353899/getDistance2.patch applied and successfully tested against trunk revision http://svn.apache.org/repos/asf/lucene/hadoop/trunk/525290. Results are at http://lucene.zones.apache.org:8080/hudson/job/Hadoop-Patch

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>    Affects Versions: 0.12.1
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>             Fix For: 0.13.0
>
>         Attachments: getDistance.patch, getDistance2.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Raghu Angadi (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482915 ] 

Raghu Angadi commented on HADOOP-1073:
--------------------------------------


I still think we don't need to optimize uncommon case at the expense of common case. I doubt if there is any real benefit to copying locally compared transferring from another node on the same switch. Note that the data still needs to flow through DFSClient, for CRC etc.



> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>         Attachments: getDistance.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482257 ] 

Hairong Kuang commented on HADOOP-1073:
---------------------------------------

Sameer, your suggestion works when the distances between any two racks are the same, but I guess it is good for now.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "dhruba borthakur (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12482910 ] 

dhruba borthakur commented on HADOOP-1073:
------------------------------------------

+1. Code looks good. I am running it on my cluster.

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>         Assigned To: Hairong Kuang
>         Attachments: getDistance.patch
>
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Hairong Kuang (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12478950 ] 

Hairong Kuang commented on HADOOP-1073:
---------------------------------------

+1 on proposal 1.

Is getDistance() the only reason that makes these two methods heavyweight?

> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (HADOOP-1073) DFS Scalability: high CPU usage in choosing replication targets and file open

Posted by "Sameer Paranjpye (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/HADOOP-1073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12478640 ] 

Sameer Paranjpye commented on HADOOP-1073:
------------------------------------------

A fat client makes it harder to provide back compatibility and makes implementations in multiple languages harder to maintain. We should avoid fattening the client whenever possible. I vote for proposal 1.



> DFS Scalability: high CPU usage in choosing replication targets and file open
> -----------------------------------------------------------------------------
>
>                 Key: HADOOP-1073
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1073
>             Project: Hadoop
>          Issue Type: Bug
>          Components: dfs
>            Reporter: dhruba borthakur
>
> I have a test cluster that has about 1600 data nodes. randomWriter fails to run because of map tasks fail with "connection timeout" message. The namenode quickly gets to 100% CPU usage. 
> The positives first:
> 1. Datanodes continue to heartbeat and there are no cascading failures.
> 2. chooseRandom() does not use much CPU and is very lightweight.
> An analysis of the namenode shows the following:
> 1. High CPU usage in FSNamesystem.getPipeline().
> 2. Moderate CPU usage in FSNamesystem.sortByDistance().
> The first one is used by chooseTarget() to sort a list of target-datanodes based on their distances from the writer. The second one is used by an open() call to arrange the list of datanodes so that the datanode that is closest to the reader is first in the list.
> I have two proposals to address this problem. Please comment.
> Proposal 1: Optimize getDistance()
> --------------
> In the current implementation, each datanode has a network path associated with it. For example "/default-rack/74.6.138.207:50010". The method getDistance() splits the network-pathname (using "/") and then does string-compares to determine the nearest common ancestor of two given nodes. One optimization would be to avoid string splits and comparisions while determining distance between two nodes.
> Instead, we can maintain the "height" at which a node is located in the network topology tree. The root node being at heigth 0. Also, from each InnerNode we maintain a direct reference to the parent node. If the two nodes are at the same height, send each node to its parent until we reach a common parent.  Thus the distance between the two nodes is 2x where x is the distance to the common parent.  If the nodes are at different depths to begin with, then repeatedly send the node at a greater height to its parent until the nodes are at the same height, and then continue as before.
> Also, the calls to check checkArgument() from getDistance() may be removed. 
> Also, the call to getPipeline() may be done outside the global FSNamesystem lock.
> Proposal 2: Distribute the workload to the DFSClient
> ---------------
> The namenode downloads the network topology to a dfsclient. The dfsclient caches it in memory. When a new block needs to be allocated, the namenode sends a list of unsorted datanodes to the client. The client sorts them based on the cached network topology map. Similarly, when a file is opened, the namenode sends the list of unsorted blocks that comprise this file. The dfsclient sorts them and uses them appropriately. The topology map can be compacted into maybe a 1Mb buffer for a 10000 node system.
> If the network topology is very big, then another option would be to have a set of toppology servers (that has a cached copy of the network topology) and the dfsclient contacts one of them to sort its list of target datanodes.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.