You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by ZhiHong Fu <dd...@gmail.com> on 2008/11/10 12:52:50 UTC
Customized InputFormat Problem
Hello,
I am doing a task, whick will read dbRecord data from web service,
and then I will build index on them,But you see, inside the hadoop , The
inputFormat is based on the FileInputFormat, So now I have to rewrite my
dbRecordInputFormat , And I do it like this:
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import zju.edu.tcmsearch.lucene.index.format.DbRecordAndOp;
import zju.edu.tcmsearch.lucene.index.mapred.DocumentID;
import zju.edu.tcmsearch.lucene.index.TcmConstants;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConfigurable;
public class DbRecordInputFormat implements InputFormat<DocumentID,
DbRecordAndOp>, JobConfigurable {
static final Log LOG = LogFactory.getLog(DbRecordInputFormat.class);
private static List<DbRecordAndOp> dbRecordAndOpList;
public static void init(String serviceUri){
LOG.info("DbRecordInputFormat init method is invoking....!");
dbRecordAndOpList=getDbRecordListByUri(serviceUri);
LOG.info("DbRecordList size is "+dbRecordAndOpList.size());
}
private static List<DbRecordAndOp> getDbRecordListByUri(String
serviceUri){
//TO DO: Invoking web service to get data records
// <! ------------------------------ Just for Test
-------------------------------- >
List<DbRecordAndOp> tempDbRecordList=new ArrayList<DbRecordAndOp>();
for(int i=0;i<1000;i++){
DbRecordAndOp dbRecordAndOp=new DbRecordAndOp();
dbRecordAndOp.setDbResQname("{http://www.dart.zju.edu.cn}db1");
dbRecordAndOp.setTableName("table"+i);
dbRecordAndOp.setOntoName("ontoName"+i);
dbRecordAndOp.setOntoIdentity("ontoUri"+i);
dbRecordAndOp.setOpType(TcmConstants.OP_TYPE.INSERT);
dbRecordAndOp.setShowContent("CCNT, DartGrid , CCNT, DartGrid");
tempDbRecordList.add(dbRecordAndOp);
}
// <! ------------------------------ Just for Test
-------------------------------- >
return tempDbRecordList;
}
private static void update(String newServiceUri){
dbRecordAndOpList=getDbRecordListByUri(newServiceUri);
}
protected class DbRecordReader implements RecordReader<DocumentID,
DbRecordAndOp>{
private DbRecordSplit split;
private long offset=0; //设置
Reporter reporter;
protected DbRecordReader(DbRecordSplit split,JobConf job,Reporter
reporter){
this.split=split;
this.reporter=reporter;
this.offset=split.getStart();
LOG.info("The start of Split is " +this.split.getStart());
LOG.info("The end of Split is "+this.split.getEnd());
LOG.info("Initialize offset is "+this.offset);
}
public DocumentID createKey(){
return new DocumentID();
}
public DbRecordAndOp createValue(){
return new DbRecordAndOp();
}
public long getPos() throws IOException{
return offset;
}
public float getProgress() throws IOException{
return (offset-split.getStart())/(float)split.getLength();
}
public boolean next(DocumentID key,DbRecordAndOp value) throws
IOException{
LOG.info("Offset is "+offset);
LOG.info("The start of Split is "+ split.getStart());
LOG.info("The end of Split is "+ split.getEnd());
LOG.info("Key is "+key.getText().toString());
LOG.info("Value is " +value.toString());
if(offset >split.getEnd())
return false;
// key=new DocumentID("dbRecord"+offset);
key.setText("DbRecord"+offset);
value=dbRecordAndOpList.get((int)offset);
LOG.info("Offset is "+offset);
LOG.info("Key is "+ key.toString());
LOG.info("DbRecordAndOp info is: ");
LOG.info("DbResQName is "+value.getDbResQname());
LOG.info("TableName is " +value.getTableName());
LOG.info("OntologyUri is " +value.getOntoIdentity());
LOG.info("OntoName is "+ value.getOntoName());
LOG.info("Show Content is" +value.getShowContent());
offset++;
return true;
}
public void close() throws IOException{
// try{
// dbRecordAndOpList.clear();
// }catch(Exception e){
// throw new IOException(e.getMessage());
// }
}
}
protected static class DbRecordSplit implements InputSplit{
private long end=0;
private long start=0;
public DbRecordSplit(){}
public DbRecordSplit(long start,long end){
this.start=start;
this.end=end;
}
public String[] getLocations() throws IOException{
return new String[]{};
}
public long getStart(){
return this.start;
}
public long getEnd(){
return this.end;
}
public long getLength(){
return end-start;
}
public void readFields(DataInput input) throws IOException{
// throw new IOException("DbRecordSplit readFields not
implementes!");
start=input.readLong();
end=input.readLong();
}
public void write(DataOutput output) throws IOException{
// throw new IOException("DbRecordSplit readFields not
implementes!");
output.writeLong(start);
output.writeLong(end);
}
}
public void configure(JobConf job) {
}
public DbRecordReader getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
LOG.info("DbRecordReader getRecordReader method is
invoking......!");
if(split!=null)
{
LOG.info("The split is "+ split);
}else{
LOG.info("The Split is null!");
}
return new DbRecordReader((DbRecordSplit)split,job,reporter);
}
public DbRecordSplit[] getSplits(JobConf job,int numSplits){
int length=dbRecordAndOpList.size();
int splitSize=length/numSplits;
LOG.info("DbRecordInputFormat getSplits() method is invoking!");
LOG.info("Record number is "+length);
LOG.info("Split Size is "+splitSize);
DbRecordSplit[] splits=new DbRecordSplit[numSplits];
for(int i=0;i<numSplits;i++){
DbRecordSplit tempSplit;
if((i+1)==numSplits){
tempSplit=new DbRecordSplit(i*splitSize,length-1);
}else{
tempSplit=new
DbRecordSplit(i*splitSize,i*splitSize+splitSize);
}
splits[i]=tempSplit;
LOG.info("For Split["+i+"], start index is "+
splits[i].getStart()+"; end index is "+splits[i].getEnd());
}
return splits;
}
public void validateInput(JobConf job) throws IOException{
}
}
But when I run , It will throw the exception in DbRecordReader.next()
method, Although I have Logged in it, I can't still see anything, and don't
know where I shoud to check, who can help me where I can get the real
excution status, so I can where the error is ! Thansks!
08/11/10 19:36:59 INFO format.DbRecordInputFormat: Record number is 1000
08/11/10 19:36:59 INFO format.DbRecordInputFormat: Split Size is 500
08/11/10 19:36:59 INFO format.DbRecordInputFormat: For Split[0], start index
is 0; end index is 500
08/11/10 19:36:59 INFO format.DbRecordInputFormat: For Split[1], start index
is 500; end index is 999
08/11/10 19:37:00 INFO mapred.JobClient: Running job: job_200811101912_0007
08/11/10 19:37:01 INFO mapred.JobClient: map 0% reduce 0%
08/11/10 19:37:08 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_0, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
08/11/10 19:37:08 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_0, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
08/11/10 19:37:13 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_1, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
08/11/10 19:37:14 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_1, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
08/11/10 19:37:19 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_2, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
08/11/10 19:37:23 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_2, Status : FAILED
java.lang.NullPointerException
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1113)
at
zju.edu.tcmsearch.lucene.index.mapred.IndexUpdater.run(IndexUpdater.java:58)
at
zju.edu.tcmsearch.lucene.index.main.UpdateIndex.main(UpdateIndex.java:192)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:585)
at org.apache.hadoop.util.RunJar.main(RunJar.java:155)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
08/11/10 19:37:28 INFO main.UpdateIndex: Elapsed time is 28s
Elapsed time is 28s
Re: Customized InputFormat Problem
Posted by Mice <mi...@gmail.com>.
in your next(key,value) method:
replace
value=dbRecordAndOpList.get((int)offset);
with
value.set(....)
your value object is created by MapRunner and passed to your next(key
,value ) as an argument, which will be used later in your mapper. you
wont change the reference to an argument inside a method.
2008/11/11 Sharad Agarwal <sh...@yahoo-inc.com>:
>
>
>> But when I run , It will throw the exception in DbRecordReader.next()
>> method, Although I have Logged in it, I can't still see anything, and don't
>> know where I shoud to check, who can help me where I can get the real
>> excution status, so I can where the error is ! Thansks!
>>
>>
> Check the logs in the tasktracker node under userlogs. Also you can see
> the logs via jobtracker web ui.
>
--
http://www.hadoopchina.com
Re: Customized InputFormat Problem
Posted by Sharad Agarwal <sh...@yahoo-inc.com>.
> But when I run , It will throw the exception in DbRecordReader.next()
> method, Although I have Logged in it, I can't still see anything, and don't
> know where I shoud to check, who can help me where I can get the real
> excution status, so I can where the error is ! Thansks!
>
>
Check the logs in the tasktracker node under userlogs. Also you can see
the logs via jobtracker web ui.