You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Erik Holstad <er...@gmail.com> on 2008/09/07 21:26:28 UTC
Failing MR jobs!
Hi!
I'm trying to run a MR job, but it keeps on failing and I can't understand
why.
Sometimes it shows output at 66% and sometimes 98% or so.
I had a couple of exception before that I didn't catch that made the job to
fail.
The log file from the task can be found at:
http://pastebin.com/m4414d369
and the code looks like:
//Java
import java.io.*;
import java.util.*;
import java.net.*;
//Hadoop
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
//HBase
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.mapred.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.client.*;
// org.apache.hadoop.hbase.client.HTable
//Extra
import org.apache.commons.cli.ParseException;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.httpclient.params.HttpMethodParams;
public class SerpentMR1 extends TableMap implements Mapper, Tool {
//Setting DebugLevel
private static final int DL = 0;
//Setting up the variables for the MR job
private static final String NAME = "SerpentMR1";
private static final String INPUTTABLE = "sources";
private final String[] COLS = {"content:feedurl", "content:ttl",
"content:updated"};
private Configuration conf;
public JobConf createSubmittableJob(String[] args) throws IOException{
JobConf c = new JobConf(getConf(), SerpentMR1.class);
String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
c.setJar(jar);
c.setJobName(NAME);
int mapTasks = 4;
int reduceTasks = 20;
c.setNumMapTasks(mapTasks);
c.setNumReduceTasks(reduceTasks);
String inputCols = "";
for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
BytesWritable.class, c);
//Classes between:
c.setOutputFormat(TextOutputFormat.class);
Path path = new Path("users"); //inserting into a temp table
FileOutputFormat.setOutputPath(c, path);
c.setReducerClass(MyReducer.class);
return c;
}
public void map(ImmutableBytesWritable key, RowResult res,
OutputCollector output, Reporter reporter)
throws IOException {
Cell cellLast = res.get(COLS[2].getBytes());//lastupdate
long oldTime = cellLast.getTimestamp();
Cell cell_ttl = res.get(COLS[1].getBytes());//ttl
long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
byte[] url = null;
long currTime = time.GetTimeInMillis();
if(currTime - oldTime > ttl){
url = res.get(COLS[0].getBytes()).getValue();//url
output.collect(new Text(Base64.encode_strip(res.getRow())), new
BytesWritable(url) );/
}
}
public static class MyReducer implements Reducer{
//org.apache.hadoop.mapred.Reducer{
private int timeout = 1000; //Sets the connection timeout time ms;
public void reduce(Object key, Iterator values, OutputCollector
output, Reporter rep)
throws IOException {
HttpClient client = new HttpClient();//new
MultiThreadedHttpConnectionManager());
client.getHttpConnectionManager().
getParams().setConnectionTimeout(timeout);
GetMethod method = null;
int stat = 0;
String content = "";
byte[] colFam = "select".getBytes();
byte[] column = "lastupdate".getBytes();
byte[] currTime = null;
HBaseRef hbref = new HBaseRef();
JerlType sendjerl = null; //new JerlType();
ArrayList jd = new ArrayList();
InputStream is = null;
while(values.hasNext()){
BytesWritable bw = (BytesWritable)values.next();
String address = new String(bw.get());
try{
System.out.println(address);
method = new GetMethod(address);
method.setFollowRedirects(true);
} catch (Exception e){
System.err.println("Invalid Address");
e.printStackTrace();
}
if (method != null){
try {
// Execute the method.
stat = client.executeMethod(method);
if(stat == 200){
content = "";
is =
(InputStream)(method.getResponseBodyAsStream());
//Write to HBase new stamp select:lastupdate
currTime =
StreamyUtil.LongToBytes(time.GetTimeInMillis() );
jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
currTime));
if (is !=
null){output.collect(((Text)key).getBytes(), is);}
}
else if(stat == 302){
System.err.println("302 not complete in reader!!
New url = ");
}
else {
System.err.println("Method failed: " +
method.getStatusLine());
currTime =
StreamyUtil.LongToBytes(Long.MAX_VALUE);
jd.add(new JerlData(INPUTTABLE.getBytes(),
((Text)key).getBytes(), colFam, column,
currTime));
//Set select:lastupdate to Long.MAX_VALUE()
}
} catch (HttpException e) {
System.err.println("Fatal protocol violation: " +
e.getMessage());
e.printStackTrace();
} catch (IOException e) {
System.err.println("Fatal transport error: " +
e.getMessage());
e.printStackTrace();
} catch (IllegalStateException e){
// System.err.println("IllegalStateException: " +
e.getMessage());
System.err.println("Unsupported protocol error: " +
e.getMessage());
e.printStackTrace();
} catch (Exception e){
System.err.println("Other Exception: " +
e.getMessage());
e.printStackTrace();
} finally {
// Release the connection.
method.releaseConnection();
}
}
}
HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
JerlData[0])), hbref);
}
public void configure(JobConf conf){}
public void close(){}
}
static int printUsage() {
System.out.println(NAME + "<input> <table_name>");
return -1;
}
public int run(@SuppressWarnings("unused") String[] args) throws
Exception {
JobClient.runJob(createSubmittableJob(args));
return 0;
}
public Configuration getConf() {
return this.conf;
}
public void setConf(final Configuration c) {
this.conf = c;
}
public static void main(String[] args) throws Exception {
int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
args);
System.exit(errCode);
}
}
Regards Erik
Re: Failing MR jobs!
Posted by Arun C Murthy <ac...@yahoo-inc.com>.
On Sep 7, 2008, at 12:26 PM, Erik Holstad wrote:
> Hi!
> I'm trying to run a MR job, but it keeps on failing and I can't
> understand
> why.
> Sometimes it shows output at 66% and sometimes 98% or so.
> I had a couple of exception before that I didn't catch that made the
> job to
> fail.
>
>
> The log file from the task can be found at:
> http://pastebin.com/m4414d369
>
From the logs it looks like the TaskTracker killed your reduce task
because it didn't report any progress for 10 mins, which is the
default timeout.
FWIW it's probably because _one_ of the calls to your 'reduce'
function got stuck trying to communicate with one of the external
resources you are using...
Arun
Re: Failing MR jobs!
Posted by Shengkai Zhu <ge...@gmail.com>.
Sorry, I didn't see the log link.
On Tue, Sep 9, 2008 at 12:01 PM, Shengkai Zhu <ge...@gmail.com> wrote:
>
> Do you have some more detailed information? Logs are helpful.
>
>
> On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <er...@gmail.com>wrote:
>
>> Hi!
>> I'm trying to run a MR job, but it keeps on failing and I can't understand
>> why.
>> Sometimes it shows output at 66% and sometimes 98% or so.
>> I had a couple of exception before that I didn't catch that made the job
>> to
>> fail.
>>
>>
>> The log file from the task can be found at:
>> http://pastebin.com/m4414d369
>>
>>
>> and the code looks like:
>> //Java
>> import java.io.*;
>> import java.util.*;
>> import java.net.*;
>>
>> //Hadoop
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.mapred.*;
>> import org.apache.hadoop.util.*;
>>
>> //HBase
>> import org.apache.hadoop.hbase.*;
>> import org.apache.hadoop.hbase.mapred.*;
>> import org.apache.hadoop.hbase.io.*;
>> import org.apache.hadoop.hbase.client.*;
>> // org.apache.hadoop.hbase.client.HTable
>>
>> //Extra
>> import org.apache.commons.cli.ParseException;
>>
>> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
>> import org.apache.commons.httpclient.*;
>> import org.apache.commons.httpclient.methods.*;
>> import org.apache.commons.httpclient.params.HttpMethodParams;
>>
>>
>> public class SerpentMR1 extends TableMap implements Mapper, Tool {
>>
>> //Setting DebugLevel
>> private static final int DL = 0;
>>
>> //Setting up the variables for the MR job
>> private static final String NAME = "SerpentMR1";
>> private static final String INPUTTABLE = "sources";
>> private final String[] COLS = {"content:feedurl", "content:ttl",
>> "content:updated"};
>>
>>
>> private Configuration conf;
>>
>> public JobConf createSubmittableJob(String[] args) throws IOException{
>> JobConf c = new JobConf(getConf(), SerpentMR1.class);
>> String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
>> c.setJar(jar);
>> c.setJobName(NAME);
>>
>> int mapTasks = 4;
>> int reduceTasks = 20;
>>
>> c.setNumMapTasks(mapTasks);
>> c.setNumReduceTasks(reduceTasks);
>>
>> String inputCols = "";
>> for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
>>
>> TableMap.initJob(INPUTTABLE, inputCols, this.getClass(),
>> Text.class,
>> BytesWritable.class, c);
>> //Classes between:
>>
>> c.setOutputFormat(TextOutputFormat.class);
>> Path path = new Path("users"); //inserting into a temp table
>> FileOutputFormat.setOutputPath(c, path);
>>
>> c.setReducerClass(MyReducer.class);
>> return c;
>> }
>>
>> public void map(ImmutableBytesWritable key, RowResult res,
>> OutputCollector output, Reporter reporter)
>> throws IOException {
>> Cell cellLast = res.get(COLS[2].getBytes());//lastupdate
>>
>> long oldTime = cellLast.getTimestamp();
>>
>> Cell cell_ttl = res.get(COLS[1].getBytes());//ttl
>> long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
>> byte[] url = null;
>>
>> long currTime = time.GetTimeInMillis();
>>
>> if(currTime - oldTime > ttl){
>> url = res.get(COLS[0].getBytes()).getValue();//url
>> output.collect(new Text(Base64.encode_strip(res.getRow())), new
>> BytesWritable(url) );/
>> }
>> }
>>
>>
>>
>> public static class MyReducer implements Reducer{
>> //org.apache.hadoop.mapred.Reducer{
>>
>>
>> private int timeout = 1000; //Sets the connection timeout time ms;
>>
>> public void reduce(Object key, Iterator values, OutputCollector
>> output, Reporter rep)
>> throws IOException {
>> HttpClient client = new HttpClient();//new
>> MultiThreadedHttpConnectionManager());
>> client.getHttpConnectionManager().
>> getParams().setConnectionTimeout(timeout);
>>
>> GetMethod method = null;
>>
>> int stat = 0;
>> String content = "";
>> byte[] colFam = "select".getBytes();
>> byte[] column = "lastupdate".getBytes();
>> byte[] currTime = null;
>>
>> HBaseRef hbref = new HBaseRef();
>> JerlType sendjerl = null; //new JerlType();
>> ArrayList jd = new ArrayList();
>>
>> InputStream is = null;
>>
>> while(values.hasNext()){
>> BytesWritable bw = (BytesWritable)values.next();
>>
>> String address = new String(bw.get());
>> try{
>> System.out.println(address);
>>
>> method = new GetMethod(address);
>> method.setFollowRedirects(true);
>>
>> } catch (Exception e){
>> System.err.println("Invalid Address");
>> e.printStackTrace();
>> }
>>
>> if (method != null){
>> try {
>> // Execute the method.
>> stat = client.executeMethod(method);
>>
>> if(stat == 200){
>> content = "";
>> is =
>> (InputStream)(method.getResponseBodyAsStream());
>>
>> //Write to HBase new stamp select:lastupdate
>> currTime =
>> StreamyUtil.LongToBytes(time.GetTimeInMillis() );
>> jd.add(new JerlData(INPUTTABLE.getBytes(),
>> ((Text)key).getBytes(), colFam, column,
>> currTime));
>>
>> if (is !=
>> null){output.collect(((Text)key).getBytes(), is);}
>> }
>> else if(stat == 302){
>> System.err.println("302 not complete in
>> reader!!
>> New url = ");
>> }
>> else {
>> System.err.println("Method failed: " +
>> method.getStatusLine());
>> currTime =
>> StreamyUtil.LongToBytes(Long.MAX_VALUE);
>> jd.add(new JerlData(INPUTTABLE.getBytes(),
>> ((Text)key).getBytes(), colFam, column,
>> currTime));
>> //Set select:lastupdate to Long.MAX_VALUE()
>> }
>>
>> } catch (HttpException e) {
>> System.err.println("Fatal protocol violation: " +
>> e.getMessage());
>> e.printStackTrace();
>> } catch (IOException e) {
>> System.err.println("Fatal transport error: " +
>> e.getMessage());
>> e.printStackTrace();
>> } catch (IllegalStateException e){
>> // System.err.println("IllegalStateException: " +
>> e.getMessage());
>> System.err.println("Unsupported protocol error: " +
>> e.getMessage());
>> e.printStackTrace();
>> } catch (Exception e){
>> System.err.println("Other Exception: " +
>> e.getMessage());
>> e.printStackTrace();
>> } finally {
>> // Release the connection.
>> method.releaseConnection();
>> }
>> }
>> }
>> HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
>> JerlData[0])), hbref);
>> }
>>
>> public void configure(JobConf conf){}
>>
>> public void close(){}
>> }
>>
>> static int printUsage() {
>> System.out.println(NAME + "<input> <table_name>");
>> return -1;
>> }
>>
>> public int run(@SuppressWarnings("unused") String[] args) throws
>> Exception {
>> JobClient.runJob(createSubmittableJob(args));
>> return 0;
>> }
>>
>> public Configuration getConf() {
>> return this.conf;
>> }
>>
>> public void setConf(final Configuration c) {
>> this.conf = c;
>> }
>>
>> public static void main(String[] args) throws Exception {
>> int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
>> args);
>> System.exit(errCode);
>> }
>>
>> }
>>
>>
>> Regards Erik
>>
>
>
>
> --
>
> 朱盛凯
>
> Jash Zhu
>
> 复旦大学软件学院
>
> Software School, Fudan University
>
--
朱盛凯
Jash Zhu
复旦大学软件学院
Software School, Fudan University
Re: Failing MR jobs!
Posted by Shengkai Zhu <ge...@gmail.com>.
Do you have some more detailed information? Logs are helpful.
On Mon, Sep 8, 2008 at 3:26 AM, Erik Holstad <er...@gmail.com> wrote:
> Hi!
> I'm trying to run a MR job, but it keeps on failing and I can't understand
> why.
> Sometimes it shows output at 66% and sometimes 98% or so.
> I had a couple of exception before that I didn't catch that made the job to
> fail.
>
>
> The log file from the task can be found at:
> http://pastebin.com/m4414d369
>
>
> and the code looks like:
> //Java
> import java.io.*;
> import java.util.*;
> import java.net.*;
>
> //Hadoop
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.util.*;
>
> //HBase
> import org.apache.hadoop.hbase.*;
> import org.apache.hadoop.hbase.mapred.*;
> import org.apache.hadoop.hbase.io.*;
> import org.apache.hadoop.hbase.client.*;
> // org.apache.hadoop.hbase.client.HTable
>
> //Extra
> import org.apache.commons.cli.ParseException;
>
> import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
> import org.apache.commons.httpclient.*;
> import org.apache.commons.httpclient.methods.*;
> import org.apache.commons.httpclient.params.HttpMethodParams;
>
>
> public class SerpentMR1 extends TableMap implements Mapper, Tool {
>
> //Setting DebugLevel
> private static final int DL = 0;
>
> //Setting up the variables for the MR job
> private static final String NAME = "SerpentMR1";
> private static final String INPUTTABLE = "sources";
> private final String[] COLS = {"content:feedurl", "content:ttl",
> "content:updated"};
>
>
> private Configuration conf;
>
> public JobConf createSubmittableJob(String[] args) throws IOException{
> JobConf c = new JobConf(getConf(), SerpentMR1.class);
> String jar = "/home/hbase/SerpentMR/" +NAME+".jar";
> c.setJar(jar);
> c.setJobName(NAME);
>
> int mapTasks = 4;
> int reduceTasks = 20;
>
> c.setNumMapTasks(mapTasks);
> c.setNumReduceTasks(reduceTasks);
>
> String inputCols = "";
> for (int i=0; i<COLS.length; i++){inputCols += COLS[i] + " "; }
>
> TableMap.initJob(INPUTTABLE, inputCols, this.getClass(), Text.class,
> BytesWritable.class, c);
> //Classes between:
>
> c.setOutputFormat(TextOutputFormat.class);
> Path path = new Path("users"); //inserting into a temp table
> FileOutputFormat.setOutputPath(c, path);
>
> c.setReducerClass(MyReducer.class);
> return c;
> }
>
> public void map(ImmutableBytesWritable key, RowResult res,
> OutputCollector output, Reporter reporter)
> throws IOException {
> Cell cellLast = res.get(COLS[2].getBytes());//lastupdate
>
> long oldTime = cellLast.getTimestamp();
>
> Cell cell_ttl = res.get(COLS[1].getBytes());//ttl
> long ttl = StreamyUtil.BytesToLong(cell_ttl.getValue() );
> byte[] url = null;
>
> long currTime = time.GetTimeInMillis();
>
> if(currTime - oldTime > ttl){
> url = res.get(COLS[0].getBytes()).getValue();//url
> output.collect(new Text(Base64.encode_strip(res.getRow())), new
> BytesWritable(url) );/
> }
> }
>
>
>
> public static class MyReducer implements Reducer{
> //org.apache.hadoop.mapred.Reducer{
>
>
> private int timeout = 1000; //Sets the connection timeout time ms;
>
> public void reduce(Object key, Iterator values, OutputCollector
> output, Reporter rep)
> throws IOException {
> HttpClient client = new HttpClient();//new
> MultiThreadedHttpConnectionManager());
> client.getHttpConnectionManager().
> getParams().setConnectionTimeout(timeout);
>
> GetMethod method = null;
>
> int stat = 0;
> String content = "";
> byte[] colFam = "select".getBytes();
> byte[] column = "lastupdate".getBytes();
> byte[] currTime = null;
>
> HBaseRef hbref = new HBaseRef();
> JerlType sendjerl = null; //new JerlType();
> ArrayList jd = new ArrayList();
>
> InputStream is = null;
>
> while(values.hasNext()){
> BytesWritable bw = (BytesWritable)values.next();
>
> String address = new String(bw.get());
> try{
> System.out.println(address);
>
> method = new GetMethod(address);
> method.setFollowRedirects(true);
>
> } catch (Exception e){
> System.err.println("Invalid Address");
> e.printStackTrace();
> }
>
> if (method != null){
> try {
> // Execute the method.
> stat = client.executeMethod(method);
>
> if(stat == 200){
> content = "";
> is =
> (InputStream)(method.getResponseBodyAsStream());
>
> //Write to HBase new stamp select:lastupdate
> currTime =
> StreamyUtil.LongToBytes(time.GetTimeInMillis() );
> jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
> currTime));
>
> if (is !=
> null){output.collect(((Text)key).getBytes(), is);}
> }
> else if(stat == 302){
> System.err.println("302 not complete in reader!!
> New url = ");
> }
> else {
> System.err.println("Method failed: " +
> method.getStatusLine());
> currTime =
> StreamyUtil.LongToBytes(Long.MAX_VALUE);
> jd.add(new JerlData(INPUTTABLE.getBytes(),
> ((Text)key).getBytes(), colFam, column,
> currTime));
> //Set select:lastupdate to Long.MAX_VALUE()
> }
>
> } catch (HttpException e) {
> System.err.println("Fatal protocol violation: " +
> e.getMessage());
> e.printStackTrace();
> } catch (IOException e) {
> System.err.println("Fatal transport error: " +
> e.getMessage());
> e.printStackTrace();
> } catch (IllegalStateException e){
> // System.err.println("IllegalStateException: " +
> e.getMessage());
> System.err.println("Unsupported protocol error: " +
> e.getMessage());
> e.printStackTrace();
> } catch (Exception e){
> System.err.println("Other Exception: " +
> e.getMessage());
> e.printStackTrace();
> } finally {
> // Release the connection.
> method.releaseConnection();
> }
> }
> }
> HBase.AddAttributes(new JerlType((JerlData[])jd.toArray(new
> JerlData[0])), hbref);
> }
>
> public void configure(JobConf conf){}
>
> public void close(){}
> }
>
> static int printUsage() {
> System.out.println(NAME + "<input> <table_name>");
> return -1;
> }
>
> public int run(@SuppressWarnings("unused") String[] args) throws
> Exception {
> JobClient.runJob(createSubmittableJob(args));
> return 0;
> }
>
> public Configuration getConf() {
> return this.conf;
> }
>
> public void setConf(final Configuration c) {
> this.conf = c;
> }
>
> public static void main(String[] args) throws Exception {
> int errCode = ToolRunner.run(new Configuration(), new SerpentMR1(),
> args);
> System.exit(errCode);
> }
>
> }
>
>
> Regards Erik
>
--
朱盛凯
Jash Zhu
复旦大学软件学院
Software School, Fudan University