You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by ZhiHong Fu <dd...@gmail.com> on 2008/12/12 08:26:36 UTC

problem in inputSplit

Hello,

Now I have encountered a very werid problem in custom split, in which I
define a IndexDirSplit cotaining a list of Index Directory Path,

I implemented like this:

package zju.edu.tcmsearch.lucene.search.format;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.List;
import java.util.ArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;



public class IndexDirInputFormat implements InputFormat<Text,Text>{
 private static final Log LOG =
LogFactory.getLog(IndexDirInputFormat.class);
 private static final String INVALID="invalid";

 public static void configure(JobConf job){

 }
 public IndexDirReader getRecordReader(InputSplit split,
  JobConf job, Reporter reporter) throws IOException {
  reporter.setStatus(split.toString());
  return new IndexDirReader((IndexDirSplit)split,job,reporter);
 }

 public IndexDirSplit[] getSplits(JobConf job,int numSplits) throws
IOException {

  int numMaps=job.getNumMapTasks();
  LOG.info("tcm.search.indexDirs: "+job.get("tcm.search.indexDirs"));
  String[] indexDirs=job.get("tcm.search.indexDirs").split(",");
  FileSystem fs=FileSystem.get(job);

  int index=0;
  for(int i=0;i<indexDirs.length;i++){
  LOG.info("indexDirs["+i+"] : "+indexDirs[i]);
  if(!fs.exists(new Path(indexDirs[i]))||!fs.isDirectory(new
Path(indexDirs[i]))){
  LOG.error("IndexDir : "+ indexDirs[i]+"is not valid!");
  indexDirs[i]=INVALID;
  }else{
  index++;
  }
  }
  numSplits=index<numMaps?index:numMaps;
  LOG.info("numSplits : "+numSplits+";index : "+index+";numMaps :
"+numMaps);
  IndexDirSplit[] indexSplits=new IndexDirSplit[numSplits];
  LOG.info("indexDirs length: " +indexDirs.length);
  for(int i=0;i<indexDirs.length;i++){
  if(!indexDirs[i].equals(INVALID))
  LOG.info("i = "+i +"; (i+numSplits)%numSplits =
"+(i+numSplits)%numSplits);
  if(indexSplits[i]==null)
  indexSplits[i]=new IndexDirSplit();
  indexSplits[(i+numSplits)%numSplits].add(indexDirs[i]);
  LOG.info("add IndexDir "+indexDirs[i]+ "to indexSplit["+i+"]");
  }

  return indexSplits;
 }

 public void validateInput(JobConf job) throws IOException{

 }

 protected class IndexDirReader implements RecordReader<Text, Text >{

  private IndexDirSplit split;
  private JobConf job;
  private Reporter reporter;
  private int offset=0;
  public IndexDirReader(IndexDirSplit split,JobConf job,Reporter reporter){
  this.split=split;
  this.job=job;
  this.reporter=reporter;
  }
  public Text createKey(){
  return new Text();
  }

  public Text createValue(){
  return new Text();
  }

  public boolean next(Text key,Text value) throws IOException{

  List<String> dbIndexPaths=split.getDbIndexPaths();

  if(offset>dbIndexPaths.size())
  return false;

  key.set("map"+offset);
  value.set(dbIndexPaths.get(offset));
  return true;

  }
  public float getProgress() throws IOException{
  return offset;
  }
  public long getPos() throws IOException{
  return offset;
  }

  public void close() throws IOException{

  }
 }

 protected class IndexDirSplit implements InputSplit{

  protected List<String> dbIndexPaths=new ArrayList<String>();
  protected int length=0;

  public IndexDirSplit(){ }

  public long getLength() throws IOException{
  return dbIndexPaths.size();
  }


  public List<String> getDbIndexPaths() {
  return dbIndexPaths;
  }

  public void add(String indexPath){
  this.dbIndexPaths.add(indexPath);
  length++;
  }
  public String[] getLocations() throws IOException{
  return new String[]{};
  }

  public IndexDirSplit(List<String> dbIndexPaths){
  this.dbIndexPaths=dbIndexPaths;
  this.length=dbIndexPaths.size();
  }

  public void readFields(DataInput in) throws IOException {
// throw new IOException("readFields(DataInput in) method haven't been
implemented!");
  length=in.readInt();
  List<String> tmpDirs=new ArrayList<String>();
  for(int i=0;i<length;i++){
  tmpDirs.add(in.readUTF());
  }
  dbIndexPaths=tmpDirs;
  }

  public void write(DataOutput out) throws IOException{
// throw new IOException("write(DataOutput out) method haven't been
implemented!");
  out.writeInt(this.dbIndexPaths.size());
  for(int i=0;i<this.dbIndexPaths.size();i++){
  out.writeUTF(this.dbIndexPaths.get(i));
  }
  }
 }
}

but When I run it ,I will throw an exception like this:

08/12/12 14:16:16 INFO mapred.JobClient: Running job: job_200812121337_0006
08/12/12 14:16:17 INFO mapred.JobClient: map 0% reduce 0%
08/12/12 14:16:24 INFO mapred.JobClient: Task Id :
attempt_200812121337_0006_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
  at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:80)
  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:199)
  at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
Caused by: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
  at java.lang.Class.getConstructor0(Class.java:2678)
  at java.lang.Class.getDeclaredConstructor(Class.java:1953)
  at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:74)
  ... 2 more

I don't know why , I have implemented a custom InputFormat like this,It
works successfully , Now I am very puzzled. Anybody can help me ? thanks!

regards!

Re: problem in inputSplit

Posted by Zhengguo 'Mike' SUN <zh...@yahoo.com>.
It seemed it was complaining about the default constructor of IndexDirSplit. Try to change "protected class IndexDirSplit" to "static class IndexDirSplit".




________________________________
From: ZhiHong Fu <dd...@gmail.com>
To: core-user@hadoop.apache.org
Sent: Friday, December 12, 2008 2:26:36 AM
Subject: problem in inputSplit

Hello,

Now I have encountered a very werid problem in custom split, in which I
define a IndexDirSplit cotaining a list of Index Directory Path,

I implemented like this:

package zju.edu.tcmsearch.lucene.search.format;

import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.List;
import java.util.ArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;



public class IndexDirInputFormat implements InputFormat<Text,Text>{
private static final Log LOG =
LogFactory.getLog(IndexDirInputFormat.class);
private static final String INVALID="invalid";

public static void configure(JobConf job){

}
public IndexDirReader getRecordReader(InputSplit split,
  JobConf job, Reporter reporter) throws IOException {
  reporter.setStatus(split.toString());
  return new IndexDirReader((IndexDirSplit)split,job,reporter);
}

public IndexDirSplit[] getSplits(JobConf job,int numSplits) throws
IOException {

  int numMaps=job.getNumMapTasks();
  LOG.info("tcm.search.indexDirs: "+job.get("tcm.search.indexDirs"));
  String[] indexDirs=job.get("tcm.search.indexDirs").split(",");
  FileSystem fs=FileSystem.get(job);

  int index=0;
  for(int i=0;i<indexDirs.length;i++){
  LOG.info("indexDirs["+i+"] : "+indexDirs[i]);
  if(!fs.exists(new Path(indexDirs[i]))||!fs.isDirectory(new
Path(indexDirs[i]))){
  LOG.error("IndexDir : "+ indexDirs[i]+"is not valid!");
  indexDirs[i]=INVALID;
  }else{
  index++;
  }
  }
  numSplits=index<numMaps?index:numMaps;
  LOG.info("numSplits : "+numSplits+";index : "+index+";numMaps :
"+numMaps);
  IndexDirSplit[] indexSplits=new IndexDirSplit[numSplits];
  LOG.info("indexDirs length: " +indexDirs.length);
  for(int i=0;i<indexDirs.length;i++){
  if(!indexDirs[i].equals(INVALID))
  LOG.info("i = "+i +"; (i+numSplits)%numSplits =
"+(i+numSplits)%numSplits);
  if(indexSplits[i]==null)
  indexSplits[i]=new IndexDirSplit();
  indexSplits[(i+numSplits)%numSplits].add(indexDirs[i]);
  LOG.info("add IndexDir "+indexDirs[i]+ "to indexSplit["+i+"]");
  }

  return indexSplits;
}

public void validateInput(JobConf job) throws IOException{

}

protected class IndexDirReader implements RecordReader<Text, Text >{

  private IndexDirSplit split;
  private JobConf job;
  private Reporter reporter;
  private int offset=0;
  public IndexDirReader(IndexDirSplit split,JobConf job,Reporter reporter){
  this.split=split;
  this.job=job;
  this.reporter=reporter;
  }
  public Text createKey(){
  return new Text();
  }

  public Text createValue(){
  return new Text();
  }

  public boolean next(Text key,Text value) throws IOException{

  List<String> dbIndexPaths=split.getDbIndexPaths();

  if(offset>dbIndexPaths.size())
  return false;

  key.set("map"+offset);
  value.set(dbIndexPaths.get(offset));
  return true;

  }
  public float getProgress() throws IOException{
  return offset;
  }
  public long getPos() throws IOException{
  return offset;
  }

  public void close() throws IOException{

  }
}

protected class IndexDirSplit implements InputSplit{

  protected List<String> dbIndexPaths=new ArrayList<String>();
  protected int length=0;

  public IndexDirSplit(){ }

  public long getLength() throws IOException{
  return dbIndexPaths.size();
  }


  public List<String> getDbIndexPaths() {
  return dbIndexPaths;
  }

  public void add(String indexPath){
  this.dbIndexPaths.add(indexPath);
  length++;
  }
  public String[] getLocations() throws IOException{
  return new String[]{};
  }

  public IndexDirSplit(List<String> dbIndexPaths){
  this.dbIndexPaths=dbIndexPaths;
  this.length=dbIndexPaths.size();
  }

  public void readFields(DataInput in) throws IOException {
// throw new IOException("readFields(DataInput in) method haven't been
implemented!");
  length=in.readInt();
  List<String> tmpDirs=new ArrayList<String>();
  for(int i=0;i<length;i++){
  tmpDirs.add(in.readUTF());
  }
  dbIndexPaths=tmpDirs;
  }

  public void write(DataOutput out) throws IOException{
// throw new IOException("write(DataOutput out) method haven't been
implemented!");
  out.writeInt(this.dbIndexPaths.size());
  for(int i=0;i<this.dbIndexPaths.size();i++){
  out.writeUTF(this.dbIndexPaths.get(i));
  }
  }
}
}

but When I run it ,I will throw an exception like this:

08/12/12 14:16:16 INFO mapred.JobClient: Running job: job_200812121337_0006
08/12/12 14:16:17 INFO mapred.JobClient: map 0% reduce 0%
08/12/12 14:16:24 INFO mapred.JobClient: Task Id :
attempt_200812121337_0006_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
  at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:80)
  at org.apache.hadoop.mapred.MapTask.run(MapTask.java:199)
  at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
Caused by: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
  at java.lang.Class.getConstructor0(Class.java:2678)
  at java.lang.Class.getDeclaredConstructor(Class.java:1953)
  at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:74)
  ... 2 more

I don't know why , I have implemented a custom InputFormat like this,It
works successfully , Now I am very puzzled. Anybody can help me ? thanks!

regards!