You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-user@hadoop.apache.org by Mehul Chadha <me...@gmail.com> on 2013/03/17 19:51:52 UTC

Strage performance Bug in hadoop map reduce

Hi,

I am doing some profiling of hadoop 1.0.3 under certain workloads for my
research and I observed some very strange performance issues.

I am doing a simple join on 2 tables, and the code works as follows. The
smaller table is distributed to every mapper using DistributedCache. The
large table is distributed by the split size on every mapper. The setup
phase of mapper creates a hashmap from this small table and in the map
function on every key iteration a get on the hashmap is done. If get
returns not NULL then the output is written. No reducer is required for
this benchmark. Following is the code for the mapper:

public class Map extends Mapper<LongWritable, Text, Text, Text> {
    private HashMap<String, String> joinData = new HashMap<String,
String>();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        String textvalue = value.toString();
        String[] tokens;
        tokens = textvalue.split(",");
        if (tokens.length == 2) {
            String joinValue = joinData.get(tokens[0]);
            if (null != joinValue) {
                context.write(new Text(tokens[0]), new Text(tokens[1] + ","
                        + joinValue));
            }
        }
    }

    public void setup(Context context) {
        try {
            Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());

            if (null != cacheFiles && cacheFiles.length > 0) {
                String line;
                String[] tokens;
                BufferedReader br = new BufferedReader(new FileReader(
                        cacheFiles[0].toString()));
                try {
                    while ((line = br.readLine()) != null) {

                        tokens = line.split(",");
                        if (tokens.length == 2) {
                            joinData.put(tokens[0], tokens[1]);
                        }
                    }

                } finally {
                    br.close();
                }
            }

        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

The strange performance occurs in the following 2 cases: I create a small
table which is 64MB and a larger table which is 640MB. There is 1 master
and 5 slave nodes. The small table file on the local node is named as
small_table and the large table file is named as large_table.

Scenario 1:

Re: Strage performance Bug in hadoop map reduce

Posted by Mehul Chadha <me...@gmail.com>.
Sorry the mail got sent before I completed. I have completed it now.

On Sun, Mar 17, 2013 at 1:51 PM, Mehul Chadha <me...@gmail.com> wrote:

> Hi,
>
> I am doing some profiling of hadoop 1.0.3 under certain workloads for my
> research and I observed some very strange performance issues.
>
> I am doing a simple join on 2 tables, and the code works as follows. The
> smaller table is distributed to every mapper using DistributedCache. The
> large table is distributed by the split size on every mapper. The setup
> phase of mapper creates a hashmap from this small table and in the map
> function on every key iteration a get on the hashmap is done. If get
> returns not NULL then the output is written. No reducer is required for
> this benchmark. Following is the code for the mapper:
>
> public class Map extends Mapper<LongWritable, Text, Text, Text> {
>     private HashMap<String, String> joinData = new HashMap<String,
> String>();
>
>     public void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>
>         String textvalue = value.toString();
>         String[] tokens;
>         tokens = textvalue.split(",");
>         if (tokens.length == 2) {
>             String joinValue = joinData.get(tokens[0]);
>             if (null != joinValue) {
>                 context.write(new Text(tokens[0]), new Text(tokens[1] + ","
>                         + joinValue));
>             }
>         }
>     }
>
>     public void setup(Context context) {
>         try {
>             Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
>                     .getConfiguration());
>
>             if (null != cacheFiles && cacheFiles.length > 0) {
>                 String line;
>                 String[] tokens;
>                 BufferedReader br = new BufferedReader(new FileReader(
>                         cacheFiles[0].toString()));
>                 try {
>                     while ((line = br.readLine()) != null) {
>
>                         tokens = line.split(",");
>                         if (tokens.length == 2) {
>                             joinData.put(tokens[0], tokens[1]);
>                         }
>                     }
>
>                 } finally {
>                     br.close();
>                 }
>             }
>
>         } catch (IOException e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
>
> The strange performance occurs in the following 2 cases: I create a small
> table which is 64MB and a larger table which is 640MB. There is 1 master
> and 5 slave nodes. The small table file on the local node is named as
> small_table and the large table file is named as large_table.
>
> Scenario 1:
>

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path /user/csv/large_path/ /user/output

                    Time Taken: 1:28 sec

  Scenario 2:

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/small
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/large
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path/ /user/csv/large_path /user/output

                    Time Taken: 50 sec

I am finding it so strange and weird. I have verified this a hundred times
and it is always true.

Re: Strage performance Bug in hadoop map reduce

Posted by Mehul Chadha <me...@gmail.com>.
Sorry the mail got sent before I completed. I have completed it now.

On Sun, Mar 17, 2013 at 1:51 PM, Mehul Chadha <me...@gmail.com> wrote:

> Hi,
>
> I am doing some profiling of hadoop 1.0.3 under certain workloads for my
> research and I observed some very strange performance issues.
>
> I am doing a simple join on 2 tables, and the code works as follows. The
> smaller table is distributed to every mapper using DistributedCache. The
> large table is distributed by the split size on every mapper. The setup
> phase of mapper creates a hashmap from this small table and in the map
> function on every key iteration a get on the hashmap is done. If get
> returns not NULL then the output is written. No reducer is required for
> this benchmark. Following is the code for the mapper:
>
> public class Map extends Mapper<LongWritable, Text, Text, Text> {
>     private HashMap<String, String> joinData = new HashMap<String,
> String>();
>
>     public void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>
>         String textvalue = value.toString();
>         String[] tokens;
>         tokens = textvalue.split(",");
>         if (tokens.length == 2) {
>             String joinValue = joinData.get(tokens[0]);
>             if (null != joinValue) {
>                 context.write(new Text(tokens[0]), new Text(tokens[1] + ","
>                         + joinValue));
>             }
>         }
>     }
>
>     public void setup(Context context) {
>         try {
>             Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
>                     .getConfiguration());
>
>             if (null != cacheFiles && cacheFiles.length > 0) {
>                 String line;
>                 String[] tokens;
>                 BufferedReader br = new BufferedReader(new FileReader(
>                         cacheFiles[0].toString()));
>                 try {
>                     while ((line = br.readLine()) != null) {
>
>                         tokens = line.split(",");
>                         if (tokens.length == 2) {
>                             joinData.put(tokens[0], tokens[1]);
>                         }
>                     }
>
>                 } finally {
>                     br.close();
>                 }
>             }
>
>         } catch (IOException e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
>
> The strange performance occurs in the following 2 cases: I create a small
> table which is 64MB and a larger table which is 640MB. There is 1 master
> and 5 slave nodes. The small table file on the local node is named as
> small_table and the large table file is named as large_table.
>
> Scenario 1:
>

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path /user/csv/large_path/ /user/output

                    Time Taken: 1:28 sec

  Scenario 2:

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/small
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/large
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path/ /user/csv/large_path /user/output

                    Time Taken: 50 sec

I am finding it so strange and weird. I have verified this a hundred times
and it is always true.

Re: Strage performance Bug in hadoop map reduce

Posted by Mehul Chadha <me...@gmail.com>.
Sorry the mail got sent before I completed. I have completed it now.

On Sun, Mar 17, 2013 at 1:51 PM, Mehul Chadha <me...@gmail.com> wrote:

> Hi,
>
> I am doing some profiling of hadoop 1.0.3 under certain workloads for my
> research and I observed some very strange performance issues.
>
> I am doing a simple join on 2 tables, and the code works as follows. The
> smaller table is distributed to every mapper using DistributedCache. The
> large table is distributed by the split size on every mapper. The setup
> phase of mapper creates a hashmap from this small table and in the map
> function on every key iteration a get on the hashmap is done. If get
> returns not NULL then the output is written. No reducer is required for
> this benchmark. Following is the code for the mapper:
>
> public class Map extends Mapper<LongWritable, Text, Text, Text> {
>     private HashMap<String, String> joinData = new HashMap<String,
> String>();
>
>     public void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>
>         String textvalue = value.toString();
>         String[] tokens;
>         tokens = textvalue.split(",");
>         if (tokens.length == 2) {
>             String joinValue = joinData.get(tokens[0]);
>             if (null != joinValue) {
>                 context.write(new Text(tokens[0]), new Text(tokens[1] + ","
>                         + joinValue));
>             }
>         }
>     }
>
>     public void setup(Context context) {
>         try {
>             Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
>                     .getConfiguration());
>
>             if (null != cacheFiles && cacheFiles.length > 0) {
>                 String line;
>                 String[] tokens;
>                 BufferedReader br = new BufferedReader(new FileReader(
>                         cacheFiles[0].toString()));
>                 try {
>                     while ((line = br.readLine()) != null) {
>
>                         tokens = line.split(",");
>                         if (tokens.length == 2) {
>                             joinData.put(tokens[0], tokens[1]);
>                         }
>                     }
>
>                 } finally {
>                     br.close();
>                 }
>             }
>
>         } catch (IOException e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
>
> The strange performance occurs in the following 2 cases: I create a small
> table which is 64MB and a larger table which is 640MB. There is 1 master
> and 5 slave nodes. The small table file on the local node is named as
> small_table and the large table file is named as large_table.
>
> Scenario 1:
>

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path /user/csv/large_path/ /user/output

                    Time Taken: 1:28 sec

  Scenario 2:

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/small
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/large
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path/ /user/csv/large_path /user/output

                    Time Taken: 50 sec

I am finding it so strange and weird. I have verified this a hundred times
and it is always true.

Re: Strage performance Bug in hadoop map reduce

Posted by Mehul Chadha <me...@gmail.com>.
Sorry the mail got sent before I completed. I have completed it now.

On Sun, Mar 17, 2013 at 1:51 PM, Mehul Chadha <me...@gmail.com> wrote:

> Hi,
>
> I am doing some profiling of hadoop 1.0.3 under certain workloads for my
> research and I observed some very strange performance issues.
>
> I am doing a simple join on 2 tables, and the code works as follows. The
> smaller table is distributed to every mapper using DistributedCache. The
> large table is distributed by the split size on every mapper. The setup
> phase of mapper creates a hashmap from this small table and in the map
> function on every key iteration a get on the hashmap is done. If get
> returns not NULL then the output is written. No reducer is required for
> this benchmark. Following is the code for the mapper:
>
> public class Map extends Mapper<LongWritable, Text, Text, Text> {
>     private HashMap<String, String> joinData = new HashMap<String,
> String>();
>
>     public void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>
>         String textvalue = value.toString();
>         String[] tokens;
>         tokens = textvalue.split(",");
>         if (tokens.length == 2) {
>             String joinValue = joinData.get(tokens[0]);
>             if (null != joinValue) {
>                 context.write(new Text(tokens[0]), new Text(tokens[1] + ","
>                         + joinValue));
>             }
>         }
>     }
>
>     public void setup(Context context) {
>         try {
>             Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context
>                     .getConfiguration());
>
>             if (null != cacheFiles && cacheFiles.length > 0) {
>                 String line;
>                 String[] tokens;
>                 BufferedReader br = new BufferedReader(new FileReader(
>                         cacheFiles[0].toString()));
>                 try {
>                     while ((line = br.readLine()) != null) {
>
>                         tokens = line.split(",");
>                         if (tokens.length == 2) {
>                             joinData.put(tokens[0], tokens[1]);
>                         }
>                     }
>
>                 } finally {
>                     br.close();
>                 }
>             }
>
>         } catch (IOException e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
>
> The strange performance occurs in the following 2 cases: I create a small
> table which is 64MB and a larger table which is 640MB. There is 1 master
> and 5 slave nodes. The small table file on the local node is named as
> small_table and the large table file is named as large_table.
>
> Scenario 1:
>

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path /user/csv/large_path/ /user/output

                    Time Taken: 1:28 sec

  Scenario 2:

                    ./hadoop dfs -copyFromLocal ~/small_table
/user/csv/small_path/small
                    ./hadoop dfs -copyFromLocal ~/large_table
/user/csv/large_path/large
                    hadoop jar hashjoin.jar hashjoindriver
/user/csv/small_path/ /user/csv/large_path /user/output

                    Time Taken: 50 sec

I am finding it so strange and weird. I have verified this a hundred times
and it is always true.