You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by ZhiHong Fu <dd...@gmail.com> on 2008/11/10 12:52:50 UTC

Customized InputFormat Problem

Hello,

       I am doing a task, whick will read dbRecord data from web service,
and then I will build index on them,But you see, inside the hadoop , The
inputFormat is based on the FileInputFormat, So now I have to rewrite my
dbRecordInputFormat , And I do it  like this:


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;

import zju.edu.tcmsearch.lucene.index.format.DbRecordAndOp;
import zju.edu.tcmsearch.lucene.index.mapred.DocumentID;
import zju.edu.tcmsearch.lucene.index.TcmConstants;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConfigurable;

public class DbRecordInputFormat implements InputFormat<DocumentID,
DbRecordAndOp>, JobConfigurable {

    static final Log LOG = LogFactory.getLog(DbRecordInputFormat.class);
    private static List<DbRecordAndOp> dbRecordAndOpList;

    public static void init(String serviceUri){
        LOG.info("DbRecordInputFormat init method is invoking....!");
        dbRecordAndOpList=getDbRecordListByUri(serviceUri);
        LOG.info("DbRecordList size is "+dbRecordAndOpList.size());
    }
    private static List<DbRecordAndOp> getDbRecordListByUri(String
serviceUri){
        //TO DO: Invoking web service to get data records

        // <! ------------------------------   Just for Test
-------------------------------- >
        List<DbRecordAndOp> tempDbRecordList=new ArrayList<DbRecordAndOp>();

        for(int i=0;i<1000;i++){
            DbRecordAndOp dbRecordAndOp=new DbRecordAndOp();
            dbRecordAndOp.setDbResQname("{http://www.dart.zju.edu.cn}db1");
            dbRecordAndOp.setTableName("table"+i);
            dbRecordAndOp.setOntoName("ontoName"+i);
            dbRecordAndOp.setOntoIdentity("ontoUri"+i);
            dbRecordAndOp.setOpType(TcmConstants.OP_TYPE.INSERT);
            dbRecordAndOp.setShowContent("CCNT, DartGrid , CCNT, DartGrid");

            tempDbRecordList.add(dbRecordAndOp);
        }

        // <! ------------------------------   Just for Test
-------------------------------- >
        return tempDbRecordList;
    }

    private static void update(String newServiceUri){
        dbRecordAndOpList=getDbRecordListByUri(newServiceUri);
    }

    protected class DbRecordReader implements RecordReader<DocumentID,
DbRecordAndOp>{

        private DbRecordSplit split;
        private long offset=0;  //设置
        Reporter reporter;

        protected DbRecordReader(DbRecordSplit split,JobConf job,Reporter
reporter){
            this.split=split;
            this.reporter=reporter;
            this.offset=split.getStart();
            LOG.info("The start of Split is " +this.split.getStart());
            LOG.info("The end of Split is "+this.split.getEnd());
            LOG.info("Initialize offset is "+this.offset);
        }

        public DocumentID createKey(){
            return new DocumentID();
        }

        public DbRecordAndOp createValue(){
            return new DbRecordAndOp();
        }

        public long getPos() throws IOException{
            return offset;
        }

        public float getProgress() throws IOException{
            return (offset-split.getStart())/(float)split.getLength();
        }

        public boolean next(DocumentID key,DbRecordAndOp value) throws
IOException{
            LOG.info("Offset is "+offset);
            LOG.info("The start of Split is "+ split.getStart());
            LOG.info("The end of Split is "+ split.getEnd());
            LOG.info("Key is "+key.getText().toString());
            LOG.info("Value is " +value.toString());

            if(offset >split.getEnd())
                return false;

//            key=new DocumentID("dbRecord"+offset);
            key.setText("DbRecord"+offset);
            value=dbRecordAndOpList.get((int)offset);

            LOG.info("Offset is "+offset);
            LOG.info("Key is "+ key.toString());
            LOG.info("DbRecordAndOp info is: ");
            LOG.info("DbResQName is "+value.getDbResQname());
            LOG.info("TableName is " +value.getTableName());
            LOG.info("OntologyUri is " +value.getOntoIdentity());
            LOG.info("OntoName is "+ value.getOntoName());
            LOG.info("Show Content is" +value.getShowContent());
            offset++;
            return true;
        }

        public void close() throws IOException{
//            try{
//                dbRecordAndOpList.clear();
//            }catch(Exception e){
//                throw new IOException(e.getMessage());
//            }
        }
    }

    protected static class DbRecordSplit implements InputSplit{
        private long end=0;
        private long start=0;

        public DbRecordSplit(){}


        public DbRecordSplit(long start,long end){
            this.start=start;
            this.end=end;
        }

        public String[] getLocations() throws IOException{
            return new String[]{};
        }

        public long getStart(){
            return this.start;
        }

        public long getEnd(){
            return this.end;
        }

        public long getLength(){
            return end-start;
        }

        public void readFields(DataInput input) throws IOException{
//            throw new IOException("DbRecordSplit readFields not
implementes!");
            start=input.readLong();
            end=input.readLong();
        }

        public void write(DataOutput output) throws IOException{
//            throw new IOException("DbRecordSplit readFields not
implementes!");
            output.writeLong(start);
            output.writeLong(end);
        }
    }


    public void configure(JobConf job) {

    }

    public DbRecordReader getRecordReader(InputSplit split,
              JobConf job, Reporter reporter) throws IOException {
        reporter.setStatus(split.toString());
        LOG.info("DbRecordReader getRecordReader method is
invoking......!");
        if(split!=null)
        {
            LOG.info("The split is "+ split);
        }else{
            LOG.info("The Split is null!");
        }
        return new DbRecordReader((DbRecordSplit)split,job,reporter);
    }

    public DbRecordSplit[] getSplits(JobConf job,int numSplits){

        int length=dbRecordAndOpList.size();
        int splitSize=length/numSplits;
        LOG.info("DbRecordInputFormat  getSplits() method is invoking!");
        LOG.info("Record number is "+length);
        LOG.info("Split Size is "+splitSize);
        DbRecordSplit[] splits=new DbRecordSplit[numSplits];
        for(int i=0;i<numSplits;i++){
            DbRecordSplit tempSplit;
            if((i+1)==numSplits){
                tempSplit=new DbRecordSplit(i*splitSize,length-1);
            }else{
                tempSplit=new
DbRecordSplit(i*splitSize,i*splitSize+splitSize);
            }
            splits[i]=tempSplit;
            LOG.info("For Split["+i+"], start index is "+
splits[i].getStart()+"; end index is "+splits[i].getEnd());
        }
        return splits;

    }

    public void validateInput(JobConf job) throws IOException{

    }

}

But when I run , It will throw the exception in DbRecordReader.next()
method, Although  I have Logged in it, I can't still see anything, and don't
know where I shoud to check, who can help me where I can get the real
excution status, so I can where the error is ! Thansks!

08/11/10 19:36:59 INFO format.DbRecordInputFormat: Record number is 1000
08/11/10 19:36:59 INFO format.DbRecordInputFormat: Split Size is 500
08/11/10 19:36:59 INFO format.DbRecordInputFormat: For Split[0], start index
is 0; end index is 500
08/11/10 19:36:59 INFO format.DbRecordInputFormat: For Split[1], start index
is 500; end index is 999
08/11/10 19:37:00 INFO mapred.JobClient: Running job: job_200811101912_0007
08/11/10 19:37:01 INFO mapred.JobClient:  map 0% reduce 0%
08/11/10 19:37:08 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_0, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

08/11/10 19:37:08 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_0, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

08/11/10 19:37:13 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_1, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

08/11/10 19:37:14 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_1, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

08/11/10 19:37:19 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000000_2, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

08/11/10 19:37:23 INFO mapred.JobClient: Task Id :
attempt_200811101912_0007_m_000001_2, Status : FAILED
java.lang.NullPointerException
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:98)
        at
zju.edu.tcmsearch.lucene.index.format.DbRecordInputFormat$DbRecordReader.next(DbRecordInputFormat.java:55)
        at
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:165)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:45)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
        at
org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)

java.io.IOException: Job failed!
        at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1113)
        at
zju.edu.tcmsearch.lucene.index.mapred.IndexUpdater.run(IndexUpdater.java:58)
        at
zju.edu.tcmsearch.lucene.index.main.UpdateIndex.main(UpdateIndex.java:192)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:585)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:155)
        at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
        at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
08/11/10 19:37:28 INFO main.UpdateIndex: Elapsed time is  28s
Elapsed time is 28s

Re: Customized InputFormat Problem

Posted by Mice <mi...@gmail.com>.
in your next(key,value) method:

replace
value=dbRecordAndOpList.get((int)offset);

with
value.set(....)

your value object is created by MapRunner and passed to your next(key
,value ) as an argument, which will be used later in your mapper. you
wont change the reference to an argument inside a method.

2008/11/11 Sharad Agarwal <sh...@yahoo-inc.com>:
>
>
>> But when I run , It will throw the exception in DbRecordReader.next()
>> method, Although  I have Logged in it, I can't still see anything, and don't
>> know where I shoud to check, who can help me where I can get the real
>> excution status, so I can where the error is ! Thansks!
>>
>>
> Check the logs in the tasktracker node under userlogs. Also you can see
> the logs via jobtracker web ui.
>



-- 
http://www.hadoopchina.com

Re: Customized InputFormat Problem

Posted by Sharad Agarwal <sh...@yahoo-inc.com>.

> But when I run , It will throw the exception in DbRecordReader.next()
> method, Although  I have Logged in it, I can't still see anything, and don't
> know where I shoud to check, who can help me where I can get the real
> excution status, so I can where the error is ! Thansks!
>
>   
Check the logs in the tasktracker node under userlogs. Also you can see
the logs via jobtracker web ui.