You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by ZhiHong Fu <dd...@gmail.com> on 2008/12/12 08:26:36 UTC
problem in inputSplit
Hello,
Now I have encountered a very werid problem in custom split, in which I
define a IndexDirSplit cotaining a list of Index Directory Path,
I implemented like this:
package zju.edu.tcmsearch.lucene.search.format;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.List;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
public class IndexDirInputFormat implements InputFormat<Text,Text>{
private static final Log LOG =
LogFactory.getLog(IndexDirInputFormat.class);
private static final String INVALID="invalid";
public static void configure(JobConf job){
}
public IndexDirReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new IndexDirReader((IndexDirSplit)split,job,reporter);
}
public IndexDirSplit[] getSplits(JobConf job,int numSplits) throws
IOException {
int numMaps=job.getNumMapTasks();
LOG.info("tcm.search.indexDirs: "+job.get("tcm.search.indexDirs"));
String[] indexDirs=job.get("tcm.search.indexDirs").split(",");
FileSystem fs=FileSystem.get(job);
int index=0;
for(int i=0;i<indexDirs.length;i++){
LOG.info("indexDirs["+i+"] : "+indexDirs[i]);
if(!fs.exists(new Path(indexDirs[i]))||!fs.isDirectory(new
Path(indexDirs[i]))){
LOG.error("IndexDir : "+ indexDirs[i]+"is not valid!");
indexDirs[i]=INVALID;
}else{
index++;
}
}
numSplits=index<numMaps?index:numMaps;
LOG.info("numSplits : "+numSplits+";index : "+index+";numMaps :
"+numMaps);
IndexDirSplit[] indexSplits=new IndexDirSplit[numSplits];
LOG.info("indexDirs length: " +indexDirs.length);
for(int i=0;i<indexDirs.length;i++){
if(!indexDirs[i].equals(INVALID))
LOG.info("i = "+i +"; (i+numSplits)%numSplits =
"+(i+numSplits)%numSplits);
if(indexSplits[i]==null)
indexSplits[i]=new IndexDirSplit();
indexSplits[(i+numSplits)%numSplits].add(indexDirs[i]);
LOG.info("add IndexDir "+indexDirs[i]+ "to indexSplit["+i+"]");
}
return indexSplits;
}
public void validateInput(JobConf job) throws IOException{
}
protected class IndexDirReader implements RecordReader<Text, Text >{
private IndexDirSplit split;
private JobConf job;
private Reporter reporter;
private int offset=0;
public IndexDirReader(IndexDirSplit split,JobConf job,Reporter reporter){
this.split=split;
this.job=job;
this.reporter=reporter;
}
public Text createKey(){
return new Text();
}
public Text createValue(){
return new Text();
}
public boolean next(Text key,Text value) throws IOException{
List<String> dbIndexPaths=split.getDbIndexPaths();
if(offset>dbIndexPaths.size())
return false;
key.set("map"+offset);
value.set(dbIndexPaths.get(offset));
return true;
}
public float getProgress() throws IOException{
return offset;
}
public long getPos() throws IOException{
return offset;
}
public void close() throws IOException{
}
}
protected class IndexDirSplit implements InputSplit{
protected List<String> dbIndexPaths=new ArrayList<String>();
protected int length=0;
public IndexDirSplit(){ }
public long getLength() throws IOException{
return dbIndexPaths.size();
}
public List<String> getDbIndexPaths() {
return dbIndexPaths;
}
public void add(String indexPath){
this.dbIndexPaths.add(indexPath);
length++;
}
public String[] getLocations() throws IOException{
return new String[]{};
}
public IndexDirSplit(List<String> dbIndexPaths){
this.dbIndexPaths=dbIndexPaths;
this.length=dbIndexPaths.size();
}
public void readFields(DataInput in) throws IOException {
// throw new IOException("readFields(DataInput in) method haven't been
implemented!");
length=in.readInt();
List<String> tmpDirs=new ArrayList<String>();
for(int i=0;i<length;i++){
tmpDirs.add(in.readUTF());
}
dbIndexPaths=tmpDirs;
}
public void write(DataOutput out) throws IOException{
// throw new IOException("write(DataOutput out) method haven't been
implemented!");
out.writeInt(this.dbIndexPaths.size());
for(int i=0;i<this.dbIndexPaths.size();i++){
out.writeUTF(this.dbIndexPaths.get(i));
}
}
}
}
but When I run it ,I will throw an exception like this:
08/12/12 14:16:16 INFO mapred.JobClient: Running job: job_200812121337_0006
08/12/12 14:16:17 INFO mapred.JobClient: map 0% reduce 0%
08/12/12 14:16:24 INFO mapred.JobClient: Task Id :
attempt_200812121337_0006_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:80)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:199)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
Caused by: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
at java.lang.Class.getConstructor0(Class.java:2678)
at java.lang.Class.getDeclaredConstructor(Class.java:1953)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:74)
... 2 more
I don't know why , I have implemented a custom InputFormat like this,It
works successfully , Now I am very puzzled. Anybody can help me ? thanks!
regards!
Re: problem in inputSplit
Posted by Zhengguo 'Mike' SUN <zh...@yahoo.com>.
It seemed it was complaining about the default constructor of IndexDirSplit. Try to change "protected class IndexDirSplit" to "static class IndexDirSplit".
________________________________
From: ZhiHong Fu <dd...@gmail.com>
To: core-user@hadoop.apache.org
Sent: Friday, December 12, 2008 2:26:36 AM
Subject: problem in inputSplit
Hello,
Now I have encountered a very werid problem in custom split, in which I
define a IndexDirSplit cotaining a list of Index Directory Path,
I implemented like this:
package zju.edu.tcmsearch.lucene.search.format;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.List;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
public class IndexDirInputFormat implements InputFormat<Text,Text>{
private static final Log LOG =
LogFactory.getLog(IndexDirInputFormat.class);
private static final String INVALID="invalid";
public static void configure(JobConf job){
}
public IndexDirReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new IndexDirReader((IndexDirSplit)split,job,reporter);
}
public IndexDirSplit[] getSplits(JobConf job,int numSplits) throws
IOException {
int numMaps=job.getNumMapTasks();
LOG.info("tcm.search.indexDirs: "+job.get("tcm.search.indexDirs"));
String[] indexDirs=job.get("tcm.search.indexDirs").split(",");
FileSystem fs=FileSystem.get(job);
int index=0;
for(int i=0;i<indexDirs.length;i++){
LOG.info("indexDirs["+i+"] : "+indexDirs[i]);
if(!fs.exists(new Path(indexDirs[i]))||!fs.isDirectory(new
Path(indexDirs[i]))){
LOG.error("IndexDir : "+ indexDirs[i]+"is not valid!");
indexDirs[i]=INVALID;
}else{
index++;
}
}
numSplits=index<numMaps?index:numMaps;
LOG.info("numSplits : "+numSplits+";index : "+index+";numMaps :
"+numMaps);
IndexDirSplit[] indexSplits=new IndexDirSplit[numSplits];
LOG.info("indexDirs length: " +indexDirs.length);
for(int i=0;i<indexDirs.length;i++){
if(!indexDirs[i].equals(INVALID))
LOG.info("i = "+i +"; (i+numSplits)%numSplits =
"+(i+numSplits)%numSplits);
if(indexSplits[i]==null)
indexSplits[i]=new IndexDirSplit();
indexSplits[(i+numSplits)%numSplits].add(indexDirs[i]);
LOG.info("add IndexDir "+indexDirs[i]+ "to indexSplit["+i+"]");
}
return indexSplits;
}
public void validateInput(JobConf job) throws IOException{
}
protected class IndexDirReader implements RecordReader<Text, Text >{
private IndexDirSplit split;
private JobConf job;
private Reporter reporter;
private int offset=0;
public IndexDirReader(IndexDirSplit split,JobConf job,Reporter reporter){
this.split=split;
this.job=job;
this.reporter=reporter;
}
public Text createKey(){
return new Text();
}
public Text createValue(){
return new Text();
}
public boolean next(Text key,Text value) throws IOException{
List<String> dbIndexPaths=split.getDbIndexPaths();
if(offset>dbIndexPaths.size())
return false;
key.set("map"+offset);
value.set(dbIndexPaths.get(offset));
return true;
}
public float getProgress() throws IOException{
return offset;
}
public long getPos() throws IOException{
return offset;
}
public void close() throws IOException{
}
}
protected class IndexDirSplit implements InputSplit{
protected List<String> dbIndexPaths=new ArrayList<String>();
protected int length=0;
public IndexDirSplit(){ }
public long getLength() throws IOException{
return dbIndexPaths.size();
}
public List<String> getDbIndexPaths() {
return dbIndexPaths;
}
public void add(String indexPath){
this.dbIndexPaths.add(indexPath);
length++;
}
public String[] getLocations() throws IOException{
return new String[]{};
}
public IndexDirSplit(List<String> dbIndexPaths){
this.dbIndexPaths=dbIndexPaths;
this.length=dbIndexPaths.size();
}
public void readFields(DataInput in) throws IOException {
// throw new IOException("readFields(DataInput in) method haven't been
implemented!");
length=in.readInt();
List<String> tmpDirs=new ArrayList<String>();
for(int i=0;i<length;i++){
tmpDirs.add(in.readUTF());
}
dbIndexPaths=tmpDirs;
}
public void write(DataOutput out) throws IOException{
// throw new IOException("write(DataOutput out) method haven't been
implemented!");
out.writeInt(this.dbIndexPaths.size());
for(int i=0;i<this.dbIndexPaths.size();i++){
out.writeUTF(this.dbIndexPaths.get(i));
}
}
}
}
but When I run it ,I will throw an exception like this:
08/12/12 14:16:16 INFO mapred.JobClient: Running job: job_200812121337_0006
08/12/12 14:16:17 INFO mapred.JobClient: map 0% reduce 0%
08/12/12 14:16:24 INFO mapred.JobClient: Task Id :
attempt_200812121337_0006_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:80)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:199)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
Caused by: java.lang.NoSuchMethodException:
zju.edu.tcmsearch.lucene.search.format.IndexDirInputFormat$IndexDirSplit.<init>()
at java.lang.Class.getConstructor0(Class.java:2678)
at java.lang.Class.getDeclaredConstructor(Class.java:1953)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:74)
... 2 more
I don't know why , I have implemented a custom InputFormat like this,It
works successfully , Now I am very puzzled. Anybody can help me ? thanks!
regards!