You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Erik Holstad <er...@gmail.com> on 2008/09/07 21:26:28 UTC

Failing MR jobs!

Hi!
I'm trying to run a MR job, but it keeps on failing and I can't understand
why.
Sometimes it shows output at 66% and sometimes 98% or so.
I had a couple of exception before that I didn't catch that made the job to
fail.


The log file from the task can be found at:
http://pastebin.com/m4414d369


and the code looks like:
//Java
import java.io.*;
import java.util.*;
import java.net.*;

//Hadoop
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

//HBase
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.mapred.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
// org.apache.hadoop.hbase.client.HTable

//Extra
import org.apache.commons.cli.ParseException;

import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.httpclient.params.HttpMethodParams;


public class SerpentMR1 extends TableMap implements Mapper, Tool {

    //Setting DebugLevel
    private static final int DL = 0;

    //Setting up the variables for the MR job
    private static final String NAME = "SerpentMR1";
    private static final String INPUTTABLE = "sources";
    private final String[] COLS = {"content:feedurl", "content:ttl",
"content:updated"};


    private Configuration conf;

    public JobConf createSubmittableJob(String[] args) throws IOException{
        JobConf c = new JobConf(getConf(), SerpentMR1.class);
        String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
        c.setJar(jar);
        c.setJobName(NAME);

        int mapTasks = 4;
        int reduceTasks = 20;

        c.setNumMapTasks(mapTasks);
        c.setNumReduceTasks(reduceTasks);

        String inputCols = "";
        for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }

        TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
BytesWritable.class, c);
        //Classes between:

        c.setOutputFormat(TextOutputFormat.class);
        Path path = new Path("users"); //inserting into a temp table
        FileOutputFormat.setOutputPath(c, path);

        c.setReducerClass(MyReducer.class);
        return c;
    }

    public void map(ImmutableBytesWritable key, RowResult res,
OutputCollector output, Reporter reporter)
    throws IOException {
        Cell cellLast    = res.get(COLS[2].getBytes());//lastupdate

        long oldTime = cellLast.getTimestamp();

        Cell cell_ttl    = res.get(COLS[1].getBytes());//ttl
        long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
        byte[] url = null;

        long currTime = time.GetTimeInMillis();

        if(currTime - oldTime > ttl){
            url = res.get(COLS[0].getBytes()).getValue();//url
            output.collect(new Text(Base64.encode_strip(res.getRow())), new
BytesWritable(url) );/
        }
    }



    public static class MyReducer implements Reducer{
//org.apache.hadoop.mapred.Reducer{


        private int timeout = 1000; //Sets the connection timeout time ms;

        public void reduce(Object key, Iterator values, OutputCollector
output, Reporter rep)
        throws IOException {
            HttpClient client = new HttpClient();//new
MultiThreadedHttpConnectionManager());
            client.getHttpConnectionManager().
                getParams().setConnectionTimeout(timeout);

            GetMethod method = null;

            int stat = 0;
            String content = "";
            byte[] colFam = "select".getBytes();
            byte[] column = "lastupdate".getBytes();
            byte[] currTime = null;

            HBaseRef hbref = new HBaseRef();
            JerlType sendjerl = null; //new JerlType();
            ArrayList jd = new ArrayList();

            InputStream is = null;

            while(values.hasNext()){
                BytesWritable bw = (BytesWritable)values.next();

                String address = new String(bw.get());
                try{
                    System.out.println(address);

                    method = new GetMethod(address);
                    method.setFollowRedirects(true);

                } catch (Exception e){
                    System.err.println("Invalid Address");
                    e.printStackTrace();
                }

                if (method != null){
                    try {
                    // Execute the method.
                        stat = client.executeMethod(method);

                        if(stat == 200){
                            content = "";
                            is =
(InputStream)(method.getResponseBodyAsStream());

                            //Write to HBase new stamp select:lastupdate
                            currTime =
StreamyUtil.LongToBytes(time.GetTimeInMillis() );
                            jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
                                                currTime));

                            if (is !=
null){output.collect(((Text)key).getBytes(), is);}
                        }
                        else if(stat == 302){
                            System.err.println("302 not complete in reader!!
New url = ");
                        }
                        else {
                            System.err.println("Method failed: " +
method.getStatusLine());
                            currTime  =
StreamyUtil.LongToBytes(Long.MAX_VALUE);
                            jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
                                                currTime));
                            //Set select:lastupdate to Long.MAX_VALUE()
                        }

                    } catch (HttpException e) {
                        System.err.println("Fatal protocol violation: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (IOException e) {
                        System.err.println("Fatal transport error: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (IllegalStateException e){
//                         System.err.println("IllegalStateException: " +
e.getMessage());
                        System.err.println("Unsupported protocol error: " +
e.getMessage());
                        e.printStackTrace();
                    } catch (Exception e){
                        System.err.println("Other Exception: " +
e.getMessage());
                        e.printStackTrace();
                    } finally {
                    // Release the connection.
                        method.releaseConnection();
                    }
                }
            }
            HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
JerlData[0])), hbref);
        }

        public void configure(JobConf conf){}

        public void close(){}
    }

    static int printUsage() {
        System.out.println(NAME + "<input> <table_name>");
        return -1;
    }

    public int run(@SuppressWarnings("unused") String[] args) throws
Exception {
        JobClient.runJob(createSubmittableJob(args));
        return 0;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(final Configuration c) {
        this.conf = c;
    }

    public static void main(String[] args) throws Exception {
        int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
args);
        System.exit(errCode);
    }

}


Regards Erik

Re: Failing MR jobs!

Posted by Arun C Murthy <ac...@yahoo-inc.com>.
On Sep 7, 2008, at 12:26 PM, Erik Holstad wrote:

> Hi!
> I'm trying to run a MR job, but it keeps on failing and I can't  
> understand
> why.
> Sometimes it shows output at 66% and sometimes 98% or so.
> I had a couple of exception before that I didn't catch that made the  
> job to
> fail.
>
>
> The log file from the task can be found at:
> http://pastebin.com/m4414d369
>

 From the logs it looks like the TaskTracker killed your reduce task  
because it didn't report any progress for 10 mins, which is the  
default timeout.

FWIW it's probably because _one_ of the calls to your 'reduce'  
function got stuck trying to communicate with one of the external  
resources you are using...

Arun

Re: Failing MR jobs!

Posted by Shengkai Zhu <ge...@gmail.com>.
Sorry, I didn't see the log link.

On Tue, Sep 9, 2008 at 12:01 PM, Shengkai Zhu <ge...@gmail.com> wrote:

>
> Do you have some more detailed information? Logs are helpful.
>
>
> On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <er...@gmail.com>wrote:
>
>> Hi!
>> I'm trying to run a MR job, but it keeps on failing and I can't understand
>> why.
>> Sometimes it shows output at 66% and sometimes 98% or so.
>> I had a couple of exception before that I didn't catch that made the job
>> to
>> fail.
>>
>>
>> The log file from the task can be found at:
>> http://pastebin.com/m4414d369
>>
>>
>> and the code looks like:
>> //Java
>> import java.io.*;
>> import java.util.*;
>> import java.net.*;
>>
>> //Hadoop
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>>
>> //HBase
>> import org.apache.hadoop.hbase.*;
>> import org.apache.hadoop.hbase.mapred.*;
>> import org.apache.hadoop.hbase.io.*;
>> import org.apache.hadoop.hbase.client.*;
>> // org.apache.hadoop.hbase.client.HTable
>>
>> //Extra
>> import org.apache.commons.cli.ParseException;
>>
>> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
>> import org.apache.commons.httpclient.*;
>> import org.apache.commons.httpclient.methods.*;
>> import org.apache.commons.httpclient.params.HttpMethodParams;
>>
>>
>> public class SerpentMR1 extends TableMap implements Mapper, Tool {
>>
>>    //Setting DebugLevel
>>    private static final int DL = 0;
>>
>>    //Setting up the variables for the MR job
>>    private static final String NAME = "SerpentMR1";
>>    private static final String INPUTTABLE = "sources";
>>    private final String[] COLS = {"content:feedurl", "content:ttl",
>> "content:updated"};
>>
>>
>>    private Configuration conf;
>>
>>    public JobConf createSubmittableJob(String[] args) throws IOException{
>>        JobConf c = new JobConf(getConf(), SerpentMR1.class);
>>        String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
>>        c.setJar(jar);
>>        c.setJobName(NAME);
>>
>>        int mapTasks = 4;
>>        int reduceTasks = 20;
>>
>>        c.setNumMapTasks(mapTasks);
>>        c.setNumReduceTasks(reduceTasks);
>>
>>        String inputCols = "";
>>        for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
>>
>>        TableMap.initJob(INPUTTABLE, inputCols, this.getClass(),
>> Text.class,
>> BytesWritable.class, c);
>>        //Classes between:
>>
>>        c.setOutputFormat(TextOutputFormat.class);
>>        Path path = new Path("users"); //inserting into a temp table
>>        FileOutputFormat.setOutputPath(c, path);
>>
>>        c.setReducerClass(MyReducer.class);
>>        return c;
>>    }
>>
>>    public void map(ImmutableBytesWritable key, RowResult res,
>> OutputCollector output, Reporter reporter)
>>    throws IOException {
>>        Cell cellLast    = res.get(COLS[2].getBytes());//lastupdate
>>
>>        long oldTime = cellLast.getTimestamp();
>>
>>        Cell cell_ttl    = res.get(COLS[1].getBytes());//ttl
>>        long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
>>        byte[] url = null;
>>
>>        long currTime = time.GetTimeInMillis();
>>
>>        if(currTime - oldTime > ttl){
>>            url = res.get(COLS[0].getBytes()).getValue();//url
>>            output.collect(new Text(Base64.encode_strip(res.getRow())), new
>> BytesWritable(url) );/
>>        }
>>    }
>>
>>
>>
>>    public static class MyReducer implements Reducer{
>> //org.apache.hadoop.mapred.Reducer{
>>
>>
>>        private int timeout = 1000; //Sets the connection timeout time ms;
>>
>>        public void reduce(Object key, Iterator values, OutputCollector
>> output, Reporter rep)
>>        throws IOException {
>>            HttpClient client = new HttpClient();//new
>> MultiThreadedHttpConnectionManager());
>>            client.getHttpConnectionManager().
>>                getParams().setConnectionTimeout(timeout);
>>
>>            GetMethod method = null;
>>
>>            int stat = 0;
>>            String content = "";
>>            byte[] colFam = "select".getBytes();
>>            byte[] column = "lastupdate".getBytes();
>>            byte[] currTime = null;
>>
>>            HBaseRef hbref = new HBaseRef();
>>            JerlType sendjerl = null; //new JerlType();
>>            ArrayList jd = new ArrayList();
>>
>>            InputStream is = null;
>>
>>            while(values.hasNext()){
>>                BytesWritable bw = (BytesWritable)values.next();
>>
>>                String address = new String(bw.get());
>>                try{
>>                    System.out.println(address);
>>
>>                    method = new GetMethod(address);
>>                    method.setFollowRedirects(true);
>>
>>                } catch (Exception e){
>>                    System.err.println("Invalid Address");
>>                    e.printStackTrace();
>>                }
>>
>>                if (method != null){
>>                    try {
>>                    // Execute the method.
>>                        stat = client.executeMethod(method);
>>
>>                        if(stat == 200){
>>                            content = "";
>>                            is =
>> (InputStream)(method.getResponseBodyAsStream());
>>
>>                            //Write to HBase new stamp select:lastupdate
>>                            currTime =
>> StreamyUtil.LongToBytes(time.GetTimeInMillis() );
>>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
>> ((Text)key).getBytes(), colFam, column,
>>                                                currTime));
>>
>>                            if (is !=
>> null){output.collect(((Text)key).getBytes(), is);}
>>                        }
>>                        else if(stat == 302){
>>                            System.err.println("302 not complete in
>> reader!!
>> New url = ");
>>                        }
>>                        else {
>>                            System.err.println("Method failed: " +
>> method.getStatusLine());
>>                            currTime  =
>> StreamyUtil.LongToBytes(Long.MAX_VALUE);
>>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
>> ((Text)key).getBytes(), colFam, column,
>>                                                currTime));
>>                            //Set select:lastupdate to Long.MAX_VALUE()
>>                        }
>>
>>                    } catch (HttpException e) {
>>                        System.err.println("Fatal protocol violation: " +
>> e.getMessage());
>>                        e.printStackTrace();
>>                    } catch (IOException e) {
>>                        System.err.println("Fatal transport error: " +
>> e.getMessage());
>>                        e.printStackTrace();
>>                    } catch (IllegalStateException e){
>> //                         System.err.println("IllegalStateException: " +
>> e.getMessage());
>>                        System.err.println("Unsupported protocol error: " +
>> e.getMessage());
>>                        e.printStackTrace();
>>                    } catch (Exception e){
>>                        System.err.println("Other Exception: " +
>> e.getMessage());
>>                        e.printStackTrace();
>>                    } finally {
>>                    // Release the connection.
>>                        method.releaseConnection();
>>                    }
>>                }
>>            }
>>            HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
>> JerlData[0])), hbref);
>>        }
>>
>>        public void configure(JobConf conf){}
>>
>>        public void close(){}
>>    }
>>
>>    static int printUsage() {
>>        System.out.println(NAME + "<input> <table_name>");
>>        return -1;
>>    }
>>
>>    public int run(@SuppressWarnings("unused") String[] args) throws
>> Exception {
>>        JobClient.runJob(createSubmittableJob(args));
>>        return 0;
>>    }
>>
>>    public Configuration getConf() {
>>        return this.conf;
>>    }
>>
>>    public void setConf(final Configuration c) {
>>        this.conf = c;
>>    }
>>
>>    public static void main(String[] args) throws Exception {
>>        int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
>> args);
>>        System.exit(errCode);
>>    }
>>
>> }
>>
>>
>> Regards Erik
>>
>
>
>
> --
>
> 朱盛凯
>
> Jash Zhu
>
> 复旦大学软件学院
>
> Software School, Fudan University
>



-- 

朱盛凯

Jash Zhu

复旦大学软件学院

Software School, Fudan University

Re: Failing MR jobs!

Posted by Shengkai Zhu <ge...@gmail.com>.
Do you have some more detailed information? Logs are helpful.

On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <er...@gmail.com> wrote:

> Hi!
> I'm trying to run a MR job, but it keeps on failing and I can't understand
> why.
> Sometimes it shows output at 66% and sometimes 98% or so.
> I had a couple of exception before that I didn't catch that made the job to
> fail.
>
>
> The log file from the task can be found at:
> http://pastebin.com/m4414d369
>
>
> and the code looks like:
> //Java
> import java.io.*;
> import java.util.*;
> import java.net.*;
>
> //Hadoop
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
>
> //HBase
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.client.*;
> // org.apache.hadoop.hbase.client.HTable
>
> //Extra
> import org.apache.commons.cli.ParseException;
>
> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
> import org.apache.commons.httpclient.*;
> import org.apache.commons.httpclient.methods.*;
> import org.apache.commons.httpclient.params.HttpMethodParams;
>
>
> public class SerpentMR1 extends TableMap implements Mapper, Tool {
>
>    //Setting DebugLevel
>    private static final int DL = 0;
>
>    //Setting up the variables for the MR job
>    private static final String NAME = "SerpentMR1";
>    private static final String INPUTTABLE = "sources";
>    private final String[] COLS = {"content:feedurl", "content:ttl",
> "content:updated"};
>
>
>    private Configuration conf;
>
>    public JobConf createSubmittableJob(String[] args) throws IOException{
>        JobConf c = new JobConf(getConf(), SerpentMR1.class);
>        String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
>        c.setJar(jar);
>        c.setJobName(NAME);
>
>        int mapTasks = 4;
>        int reduceTasks = 20;
>
>        c.setNumMapTasks(mapTasks);
>        c.setNumReduceTasks(reduceTasks);
>
>        String inputCols = "";
>        for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
>
>        TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
> BytesWritable.class, c);
>        //Classes between:
>
>        c.setOutputFormat(TextOutputFormat.class);
>        Path path = new Path("users"); //inserting into a temp table
>        FileOutputFormat.setOutputPath(c, path);
>
>        c.setReducerClass(MyReducer.class);
>        return c;
>    }
>
>    public void map(ImmutableBytesWritable key, RowResult res,
> OutputCollector output, Reporter reporter)
>    throws IOException {
>        Cell cellLast    = res.get(COLS[2].getBytes());//lastupdate
>
>        long oldTime = cellLast.getTimestamp();
>
>        Cell cell_ttl    = res.get(COLS[1].getBytes());//ttl
>        long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
>        byte[] url = null;
>
>        long currTime = time.GetTimeInMillis();
>
>        if(currTime - oldTime > ttl){
>            url = res.get(COLS[0].getBytes()).getValue();//url
>            output.collect(new Text(Base64.encode_strip(res.getRow())), new
> BytesWritable(url) );/
>        }
>    }
>
>
>
>    public static class MyReducer implements Reducer{
> //org.apache.hadoop.mapred.Reducer{
>
>
>        private int timeout = 1000; //Sets the connection timeout time ms;
>
>        public void reduce(Object key, Iterator values, OutputCollector
> output, Reporter rep)
>        throws IOException {
>            HttpClient client = new HttpClient();//new
> MultiThreadedHttpConnectionManager());
>            client.getHttpConnectionManager().
>                getParams().setConnectionTimeout(timeout);
>
>            GetMethod method = null;
>
>            int stat = 0;
>            String content = "";
>            byte[] colFam = "select".getBytes();
>            byte[] column = "lastupdate".getBytes();
>            byte[] currTime = null;
>
>            HBaseRef hbref = new HBaseRef();
>            JerlType sendjerl = null; //new JerlType();
>            ArrayList jd = new ArrayList();
>
>            InputStream is = null;
>
>            while(values.hasNext()){
>                BytesWritable bw = (BytesWritable)values.next();
>
>                String address = new String(bw.get());
>                try{
>                    System.out.println(address);
>
>                    method = new GetMethod(address);
>                    method.setFollowRedirects(true);
>
>                } catch (Exception e){
>                    System.err.println("Invalid Address");
>                    e.printStackTrace();
>                }
>
>                if (method != null){
>                    try {
>                    // Execute the method.
>                        stat = client.executeMethod(method);
>
>                        if(stat == 200){
>                            content = "";
>                            is =
> (InputStream)(method.getResponseBodyAsStream());
>
>                            //Write to HBase new stamp select:lastupdate
>                            currTime =
> StreamyUtil.LongToBytes(time.GetTimeInMillis() );
>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
>                                                currTime));
>
>                            if (is !=
> null){output.collect(((Text)key).getBytes(), is);}
>                        }
>                        else if(stat == 302){
>                            System.err.println("302 not complete in reader!!
> New url = ");
>                        }
>                        else {
>                            System.err.println("Method failed: " +
> method.getStatusLine());
>                            currTime  =
> StreamyUtil.LongToBytes(Long.MAX_VALUE);
>                            jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
>                                                currTime));
>                            //Set select:lastupdate to Long.MAX_VALUE()
>                        }
>
>                    } catch (HttpException e) {
>                        System.err.println("Fatal protocol violation: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (IOException e) {
>                        System.err.println("Fatal transport error: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (IllegalStateException e){
> //                         System.err.println("IllegalStateException: " +
> e.getMessage());
>                        System.err.println("Unsupported protocol error: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } catch (Exception e){
>                        System.err.println("Other Exception: " +
> e.getMessage());
>                        e.printStackTrace();
>                    } finally {
>                    // Release the connection.
>                        method.releaseConnection();
>                    }
>                }
>            }
>            HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
> JerlData[0])), hbref);
>        }
>
>        public void configure(JobConf conf){}
>
>        public void close(){}
>    }
>
>    static int printUsage() {
>        System.out.println(NAME + "<input> <table_name>");
>        return -1;
>    }
>
>    public int run(@SuppressWarnings("unused") String[] args) throws
> Exception {
>        JobClient.runJob(createSubmittableJob(args));
>        return 0;
>    }
>
>    public Configuration getConf() {
>        return this.conf;
>    }
>
>    public void setConf(final Configuration c) {
>        this.conf = c;
>    }
>
>    public static void main(String[] args) throws Exception {
>        int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
> args);
>        System.exit(errCode);
>    }
>
> }
>
>
> Regards Erik
>



-- 

朱盛凯

Jash Zhu

复旦大学软件学院

Software School, Fudan University