You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hama.apache.org by "Edward J. Yoon (JIRA)" <ji...@apache.org> on 2015/08/06 12:45:04 UTC

[jira] [Comment Edited] (HAMA-970) Exception can occur if the size of splits is bigger than numBSPTasks

    [ https://issues.apache.org/jira/browse/HAMA-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14659817#comment-14659817 ] 

Edward J. Yoon edited comment on HAMA-970 at 8/6/15 10:44 AM:
--------------------------------------------------------------

Hello JongYoon, 

This issue isn't simple, somewhat tricky, and obviously *core* contribution. I also thinking about what is best way. :-)

First of all, the number of splits equals to the number of HDFS blocks. For instance, 1GB file will be splitted into 8 blocks if HDFS default block size is 128MB.

Now, let's assume that the Hama cluster has 5 task slots. Currently BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB file as a input. To address this issue fundamentally, you should assign one more splits to each task. Please look into HADOOP-2560 "Processing multiple input splits per mapper task" and HAMA-964.

{code}
BSPJobClient.java

      if (maxTasks < splits.length) {
        throw new IOException(
            "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
                + splits.length + ", The number of max tasks: " + maxTasks);
      }
{code}

Once HAMA-964 is addressed, I think we can clean up these codes. :-)

Also, there's also opposite case. Let's assume that the Hama cluster has 20 task slots and user want to use MAX tasks for 1GB input data. The framework creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without assigned split. Then, user can re-distribute input data among 20 tasks within BSP program.

{code}
JobInProgress.java

      // creates 8 tasks
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < splits.length; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            splits[i], this.conf, this, i);
      }

      // creates 12 tasks without assigned split 
      for (int i = splits.length; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            null, this.conf, this, i);
      }
{code}

For this case, another option is use of Input Partitioner. User can split input data into multiple files as number user want. It means that user can force-set the number of bap tasks.

{code}
Below job has two input files, but if you set bsp.setNumBspTask(4), input partitioning job split 2 text files into 4 files, and then main BSP program will be run with 4 tasks.

  public void testPartitioner() throws Exception {

    Configuration conf = new Configuration();
    conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
    conf.setBoolean("bsp.input.runtime.partitioning", true);
    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
    bsp.setJobName("Test partitioning with input");
    bsp.setBspClass(PartionedBSP.class);
    bsp.setNumBspTask(4);
    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
    bsp.setInputFormat(TextInputFormat.class);
    bsp.setOutputFormat(NullOutputFormat.class);
    FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
    bsp.setPartitioner(HashPartitioner.class);
    assertTrue(bsp.waitForCompletion(true));

    FileSystem fs = FileSystem.get(conf);
    fs.delete(OUTPUT_PATH, true);
  }
{code}

So, if you want to address this issue, my suggestion is like below:

1) Please check first whether CombineFileInputFormat can be used when the size of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.

1. http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java



was (Author: udanax):
Hello JongYoon, 

This issue isn't simple, somewhat tricky, and obviously *core* contribution. I also thinking about what is best way. :-)

First of all, the number of splits equals to the number of HDFS blocks. For instance, 1GB file will be splitted into 8 blocks if HDFS default block size is 128MB.

Now, let's assume that the Hama cluster has 5 task slots. Currently BSPJobClient will throw the "exceed max capacity" exception if user set the 1GB file as a input. To address this issue fundamentally, you should assign one more splits to each task. Please look into HADOOP-2560 "Processing multiple input splits per mapper task" and HAMA-964.

{code}
BSPJobClient.java

      if (maxTasks < splits.length) {
        throw new IOException(
            "Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
                + splits.length + ", The number of max tasks: " + maxTasks);
      }
{code}

Once HAMA-964 is addressed, I think we can clean up these codes. :-)

Also, there's also opposite case. Let's assume that the Hama cluster has 20 task slots and user want to use MAX tasks for 1GB input data. The framework creates 8 tasks for assigning 8 splits first, and then creates 12 tasks without assigned split. Then, user can re-distribute input data among 20 tasks within BSP program.

{code}
JobInProgress.java

      // creates 8 tasks
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < splits.length; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            splits[i], this.conf, this, i);
      }

      // creates 12 tasks without assigned split 
      for (int i = splits.length; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            null, this.conf, this, i);
      }
{code}

For this case, another option is use of Input Partitioner. User can split input data into multiple files as number user want. It means that user can force-set the number of bap tasks.

{code}
  public void testPartitioner() throws Exception {

    Configuration conf = new Configuration();
    conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
    conf.setBoolean("bsp.input.runtime.partitioning", true);
    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
    bsp.setJobName("Test partitioning with input");
    bsp.setBspClass(PartionedBSP.class);
    bsp.setNumBspTask(2);
    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
    bsp.setInputFormat(TextInputFormat.class);
    bsp.setOutputFormat(NullOutputFormat.class);
    FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
    bsp.setPartitioner(HashPartitioner.class);
    assertTrue(bsp.waitForCompletion(true));

    FileSystem fs = FileSystem.get(conf);
    fs.delete(OUTPUT_PATH, true);
  }
{code}

So, if you want to address this issue, my suggestion is like below:

1) Please check first whether CombineFileInputFormat can be used when the size of splits is bigger than Hama cluster max capacity.
2) CombineFileInputFormat is only for Text file format. To support binary file format, we need to fix HAMA-964.
3) Documentation that explains how handle this problem.

1. http://svn.apache.org/repos/asf/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java


> Exception can occur if the size of splits is bigger than numBSPTasks
> --------------------------------------------------------------------
>
>                 Key: HAMA-970
>                 URL: https://issues.apache.org/jira/browse/HAMA-970
>             Project: Hama
>          Issue Type: Bug
>          Components: bsp core
>    Affects Versions: 0.7.0
>            Reporter: JongYoon Lim
>            Priority: Trivial
>         Attachments: HAMA-970.patch
>
>
> In JonInProgress, it's possble to get Exception in initTasks(). 
> {code:java}
> this.tasks = new TaskInProgress[numBSPTasks];
> for (int i = 0; i < splits.length; i++) {
>   tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(), splits[i], this.conf, this, i);
> }
> {code}
> I'm not sure that *numBSPTask* is always bigger than *splits.length*. 
> So, I think it's better to use bigger value to assign the *tasks* array. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)