You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Denis Kreis <de...@gmail.com> on 2011/11/24 11:50:56 UTC

Issue with DistributedCache

Hi

I' trying to modify the word count example
(http://wiki.apache.org/hadoop/WordCount) using the new api
(org.apache.hadoop.mapreduce.*). I run the job on a remote
pseudo-distributed cluster. It works fine with the old api, but when I
using the new one, i'm getting this:


11/11/24 11:28:02 INFO mapred.JobClient: Task Id :
attempt_201111241046_0005_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException:
WordCountNewAPI$WordCountMapper
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:866)
	at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
	at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
	at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: java.lang.ClassNotFoundException: WordCountNewAPI$WordCountMapper
	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:247)
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:819)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:864)
	... 8 more

The sources are in the attachment

Regards
Denis

Re: Passing data files via the distributed cache

Posted by Robert Evans <ev...@yahoo-inc.com>.
There is currently no way to delete the data from the cache when you are done.  It is garbage collected when the cache starts to fill up (in LRU order if you are on a newer release).  The DistributedCache.addCacheFile is modifying the JobConf behind the scenes for you.  If you want to dig into the details of what it is doing you can look at the source code for it.

--Bobby Evans

On 11/28/11 4:46 AM, "Andy Doddington" <an...@doddington.net> wrote:

Thanks for that link Prashant - very useful.

Two brief follow-up questions:

1) Having put data in the cache, I would like to be a good citizen by deleting the data from the cache once
    I've finished - how do I do that?
2) Would it be simpler to pass the data as a value in the jobConf object?

Thanks,

        Andy D.

On 25 Nov 2011, at 12:14, Prashant Kommireddi wrote:

> I believe you want to ship data to each node in your cluster before MR
> begins so the mappers can access files local to their machine. Hadoop
> tutorial on YDN has some good info on this.
>
> http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata
>
> -Prashant Kommireddi
>
> On Fri, Nov 25, 2011 at 1:05 AM, Andy Doddington <an...@doddington.net>wrote:
>
>> I have a series of mappers that I would like to be passed data using the
>> distributed cache mechanism. At the
>> moment, I am using HDFS to pass the data, but this seems wasteful to me,
>> since they are all reading the same data.
>>
>> Is there a piece of example code that shows how data files can be placed
>> in the cache and accessed by mappers?
>>
>> Thanks,
>>
>>       Andy Doddington
>>
>>



Re: Passing data files via the distributed cache

Posted by Andy Doddington <an...@doddington.net>.
Thanks for that link Prashant - very useful.

Two brief follow-up questions:

1) Having put data in the cache, I would like to be a good citizen by deleting the data from the cache once
    I’ve finished - how do I do that?
2) Would it be simpler to pass the data as a value in the jobConf object?

Thanks,

	Andy D.

On 25 Nov 2011, at 12:14, Prashant Kommireddi wrote:

> I believe you want to ship data to each node in your cluster before MR
> begins so the mappers can access files local to their machine. Hadoop
> tutorial on YDN has some good info on this.
> 
> http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata
> 
> -Prashant Kommireddi
> 
> On Fri, Nov 25, 2011 at 1:05 AM, Andy Doddington <an...@doddington.net>wrote:
> 
>> I have a series of mappers that I would like to be passed data using the
>> distributed cache mechanism. At the
>> moment, I am using HDFS to pass the data, but this seems wasteful to me,
>> since they are all reading the same data.
>> 
>> Is there a piece of example code that shows how data files can be placed
>> in the cache and accessed by mappers?
>> 
>> Thanks,
>> 
>>       Andy Doddington
>> 
>> 


Re: Passing data files via the distributed cache

Posted by Prashant Kommireddi <pr...@gmail.com>.
I believe you want to ship data to each node in your cluster before MR
begins so the mappers can access files local to their machine. Hadoop
tutorial on YDN has some good info on this.

http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata

-Prashant Kommireddi

On Fri, Nov 25, 2011 at 1:05 AM, Andy Doddington <an...@doddington.net>wrote:

> I have a series of mappers that I would like to be passed data using the
> distributed cache mechanism. At the
> moment, I am using HDFS to pass the data, but this seems wasteful to me,
> since they are all reading the same data.
>
> Is there a piece of example code that shows how data files can be placed
> in the cache and accessed by mappers?
>
> Thanks,
>
>        Andy Doddington
>
>

Passing data files via the distributed cache

Posted by Andy Doddington <an...@doddington.net>.
I have a series of mappers that I would like to be passed data using the distributed cache mechanism. At the
moment, I am using HDFS to pass the data, but this seems wasteful to me, since they are all reading the same data.

Is there a piece of example code that shows how data files can be placed in the cache and accessed by mappers?

Thanks,

	Andy Doddington


Re: Issue with DistributedCache

Posted by Bejoy Ks <be...@gmail.com>.
My Bad, I pasted the wrong file. It is updated now, did a few tiny
modifications(commented in code) and it was working fine for me.
http://pastebin.com/RDuZX7Qd

Alex,
Thanks a lot for pointing out that.

Regards
Bejoy.KS

On Thu, Nov 24, 2011 at 8:31 PM, Alexander C.H. Lorenz <
wget.null@googlemail.com> wrote:

> Hi,
>
> a typo?
> import com.bejoy.sampels.worcount.WordCountDriver;
> = wor_d_count ?
>
> - alex
>
> On Thu, Nov 24, 2011 at 3:45 PM, Bejoy Ks <be...@gmail.com> wrote:
>
> > Hi Denis
> >       I tried your code with out distributed cache locally and it worked
> > fine for me. Please find it at
> > http://pastebin.com/ki175YUx
> >
> > I echo Mike's words in submitting a map reduce jobs remotely. The remote
> > machine can be your local PC or any utility server as Mike specified.
> What
> > you need to have in remote machine is a replica of hadoop jars and
> > configuration files same as that of your hadoop cluster. (If you don't
> have
> > a remote util server set up then you can use your dev machine for the
> > same). Just trigger the hadoop job  on local machine and the actual job
> > would be submitted and running on your cluster based on the NN host and
> > configuration parameters you have on your config files.
> >
> > Hope it helps!..
> >
> > Regards
> > Bejoy.K.S
> >
> > On Thu, Nov 24, 2011 at 7:09 PM, Michel Segel <michael_segel@hotmail.com
> > >wrote:
> >
> > > Denis...
> > >
> > > Sorry, you lost me.
> > >
> > > Just to make sure we're using the same terminology...
> > > The cluster is comprised of two types of nodes...
> > > The data nodes which run DN,TT, and if you have HBase, RS.
> > > Then there are control nodes which run you NN,SN, JT and if you run
> > HBase,
> > > HM and ZKs ...
> > >
> > > Outside of the cluster we have machines set up with Hadoop installed
> but
> > > are not running any of the processes. They are where our users launch
> > there
> > > jobs. We call them edge nodes. ( it's not a good idea to let users
> > directly
> > > on the actual cluster.)
> > >
> > > Ok, having said all of that... You launch you job from the edge
> nodes...
> > > Your data sits in HDFS so you don't need distributed cache at all. Does
> > > that make sense?
> > > You job will run on the local machine, connect to the JT and then run.
> > >
> > > We set up the edge nodes so that all of the jars, config files are
> > already
> > > set up for the users and we can better control access...
> > >
> > > Sent from a remote device. Please excuse any typos...
> > >
> > > Mike Segel
> > >
> > > On Nov 24, 2011, at 7:22 AM, Denis Kreis <de...@gmail.com> wrote:
> > >
> > > > Without using the distributed cache i'm getting the same error. It's
> > > > because i start the job from a remote client / programmatically
> > > >
> > > > 2011/11/24 Michel Segel <mi...@hotmail.com>:
> > > >> Silly question... Why do you need to use the distributed cache for
> the
> > > word count program?
> > > >>  What are you trying to accomplish?
> > > >>
> > > >> I've only had to play with it for one project where we had to push
> out
> > > a bunch of c++ code to the nodes as part of a job...
> > > >>
> > > >> Sent from a remote device. Please excuse any typos...
> > > >>
> > > >> Mike Segel
> > > >>
> > > >> On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com>
> wrote:
> > > >>
> > > >>> Hi Bejoy
> > > >>>
> > > >>> 1. Old API:
> > > >>> The Map and Reduce classes are the same as in the example, the main
> > > >>> method is as follows
> > > >>>
> > > >>> public static void main(String[] args) throws IOException,
> > > >>> InterruptedException {
> > > >>>        UserGroupInformation ugi =
> > > >>> UserGroupInformation.createProxyUser("<remote user name>",
> > > >>> UserGroupInformation.getLoginUser());
> > > >>>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
> > > >>>            public Void run() throws Exception {
> > > >>>
> > > >>>                JobConf conf = new JobConf(WordCount.class);
> > > >>>                conf.setJobName("wordcount");
> > > >>>
> > > >>>                conf.setOutputKeyClass(Text.class);
> > > >>>                conf.setOutputValueClass(IntWritable.class);
> > > >>>
> > > >>>                conf.setMapperClass(Map.class);
> > > >>>                conf.setCombinerClass(Reduce.class);
> > > >>>                conf.setReducerClass(Reduce.class);
> > > >>>
> > > >>>                conf.setInputFormat(TextInputFormat.class);
> > > >>>                conf.setOutputFormat(TextOutputFormat.class);
> > > >>>
> > > >>>                FileInputFormat.setInputPaths(conf, new Path("<path
> to
> > > input dir>"));
> > > >>>                FileOutputFormat.setOutputPath(conf, new Path("<path
> > to
> > > >>> output dir>"));
> > > >>>
> > > >>>                conf.set("mapred.job.tracker", "<ip:8021>");
> > > >>>
> > > >>>                FileSystem fs = FileSystem.get(new
> > > URI("hdfs://<ip>:8020"),
> > > >>> new Configuration());
> > > >>>                fs.mkdirs(new Path("<remote path>"));
> > > >>>                fs.copyFromLocalFile(new Path("<local
> > path>/test.jar"),
> > > new
> > > >>> Path("<remote path>"));
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> >
>
>
>
> --
> Alexander Lorenz
> http://mapredit.blogspot.com
>
> *P **Think of the environment: please don't print this email unless you
> really need to.*
>

Re: Issue with DistributedCache

Posted by "Alexander C.H. Lorenz" <wg...@googlemail.com>.
Hi,

a typo?
import com.bejoy.sampels.worcount.WordCountDriver;
= wor_d_count ?

- alex

On Thu, Nov 24, 2011 at 3:45 PM, Bejoy Ks <be...@gmail.com> wrote:

> Hi Denis
>       I tried your code with out distributed cache locally and it worked
> fine for me. Please find it at
> http://pastebin.com/ki175YUx
>
> I echo Mike's words in submitting a map reduce jobs remotely. The remote
> machine can be your local PC or any utility server as Mike specified. What
> you need to have in remote machine is a replica of hadoop jars and
> configuration files same as that of your hadoop cluster. (If you don't have
> a remote util server set up then you can use your dev machine for the
> same). Just trigger the hadoop job  on local machine and the actual job
> would be submitted and running on your cluster based on the NN host and
> configuration parameters you have on your config files.
>
> Hope it helps!..
>
> Regards
> Bejoy.K.S
>
> On Thu, Nov 24, 2011 at 7:09 PM, Michel Segel <michael_segel@hotmail.com
> >wrote:
>
> > Denis...
> >
> > Sorry, you lost me.
> >
> > Just to make sure we're using the same terminology...
> > The cluster is comprised of two types of nodes...
> > The data nodes which run DN,TT, and if you have HBase, RS.
> > Then there are control nodes which run you NN,SN, JT and if you run
> HBase,
> > HM and ZKs ...
> >
> > Outside of the cluster we have machines set up with Hadoop installed but
> > are not running any of the processes. They are where our users launch
> there
> > jobs. We call them edge nodes. ( it's not a good idea to let users
> directly
> > on the actual cluster.)
> >
> > Ok, having said all of that... You launch you job from the edge nodes...
> > Your data sits in HDFS so you don't need distributed cache at all. Does
> > that make sense?
> > You job will run on the local machine, connect to the JT and then run.
> >
> > We set up the edge nodes so that all of the jars, config files are
> already
> > set up for the users and we can better control access...
> >
> > Sent from a remote device. Please excuse any typos...
> >
> > Mike Segel
> >
> > On Nov 24, 2011, at 7:22 AM, Denis Kreis <de...@gmail.com> wrote:
> >
> > > Without using the distributed cache i'm getting the same error. It's
> > > because i start the job from a remote client / programmatically
> > >
> > > 2011/11/24 Michel Segel <mi...@hotmail.com>:
> > >> Silly question... Why do you need to use the distributed cache for the
> > word count program?
> > >>  What are you trying to accomplish?
> > >>
> > >> I've only had to play with it for one project where we had to push out
> > a bunch of c++ code to the nodes as part of a job...
> > >>
> > >> Sent from a remote device. Please excuse any typos...
> > >>
> > >> Mike Segel
> > >>
> > >> On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com> wrote:
> > >>
> > >>> Hi Bejoy
> > >>>
> > >>> 1. Old API:
> > >>> The Map and Reduce classes are the same as in the example, the main
> > >>> method is as follows
> > >>>
> > >>> public static void main(String[] args) throws IOException,
> > >>> InterruptedException {
> > >>>        UserGroupInformation ugi =
> > >>> UserGroupInformation.createProxyUser("<remote user name>",
> > >>> UserGroupInformation.getLoginUser());
> > >>>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
> > >>>            public Void run() throws Exception {
> > >>>
> > >>>                JobConf conf = new JobConf(WordCount.class);
> > >>>                conf.setJobName("wordcount");
> > >>>
> > >>>                conf.setOutputKeyClass(Text.class);
> > >>>                conf.setOutputValueClass(IntWritable.class);
> > >>>
> > >>>                conf.setMapperClass(Map.class);
> > >>>                conf.setCombinerClass(Reduce.class);
> > >>>                conf.setReducerClass(Reduce.class);
> > >>>
> > >>>                conf.setInputFormat(TextInputFormat.class);
> > >>>                conf.setOutputFormat(TextOutputFormat.class);
> > >>>
> > >>>                FileInputFormat.setInputPaths(conf, new Path("<path to
> > input dir>"));
> > >>>                FileOutputFormat.setOutputPath(conf, new Path("<path
> to
> > >>> output dir>"));
> > >>>
> > >>>                conf.set("mapred.job.tracker", "<ip:8021>");
> > >>>
> > >>>                FileSystem fs = FileSystem.get(new
> > URI("hdfs://<ip>:8020"),
> > >>> new Configuration());
> > >>>                fs.mkdirs(new Path("<remote path>"));
> > >>>                fs.copyFromLocalFile(new Path("<local
> path>/test.jar"),
> > new
> > >>> Path("<remote path>"));
> > >>>
> > >>>
> > >>
> > >
> >
>



-- 
Alexander Lorenz
http://mapredit.blogspot.com

*P **Think of the environment: please don't print this email unless you
really need to.*

Re: Issue with DistributedCache

Posted by Bejoy Ks <be...@gmail.com>.
Hi Denis
       I tried your code with out distributed cache locally and it worked
fine for me. Please find it at
http://pastebin.com/ki175YUx

I echo Mike's words in submitting a map reduce jobs remotely. The remote
machine can be your local PC or any utility server as Mike specified. What
you need to have in remote machine is a replica of hadoop jars and
configuration files same as that of your hadoop cluster. (If you don't have
a remote util server set up then you can use your dev machine for the
same). Just trigger the hadoop job  on local machine and the actual job
would be submitted and running on your cluster based on the NN host and
configuration parameters you have on your config files.

Hope it helps!..

Regards
Bejoy.K.S

On Thu, Nov 24, 2011 at 7:09 PM, Michel Segel <mi...@hotmail.com>wrote:

> Denis...
>
> Sorry, you lost me.
>
> Just to make sure we're using the same terminology...
> The cluster is comprised of two types of nodes...
> The data nodes which run DN,TT, and if you have HBase, RS.
> Then there are control nodes which run you NN,SN, JT and if you run HBase,
> HM and ZKs ...
>
> Outside of the cluster we have machines set up with Hadoop installed but
> are not running any of the processes. They are where our users launch there
> jobs. We call them edge nodes. ( it's not a good idea to let users directly
> on the actual cluster.)
>
> Ok, having said all of that... You launch you job from the edge nodes...
> Your data sits in HDFS so you don't need distributed cache at all. Does
> that make sense?
> You job will run on the local machine, connect to the JT and then run.
>
> We set up the edge nodes so that all of the jars, config files are already
> set up for the users and we can better control access...
>
> Sent from a remote device. Please excuse any typos...
>
> Mike Segel
>
> On Nov 24, 2011, at 7:22 AM, Denis Kreis <de...@gmail.com> wrote:
>
> > Without using the distributed cache i'm getting the same error. It's
> > because i start the job from a remote client / programmatically
> >
> > 2011/11/24 Michel Segel <mi...@hotmail.com>:
> >> Silly question... Why do you need to use the distributed cache for the
> word count program?
> >>  What are you trying to accomplish?
> >>
> >> I've only had to play with it for one project where we had to push out
> a bunch of c++ code to the nodes as part of a job...
> >>
> >> Sent from a remote device. Please excuse any typos...
> >>
> >> Mike Segel
> >>
> >> On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com> wrote:
> >>
> >>> Hi Bejoy
> >>>
> >>> 1. Old API:
> >>> The Map and Reduce classes are the same as in the example, the main
> >>> method is as follows
> >>>
> >>> public static void main(String[] args) throws IOException,
> >>> InterruptedException {
> >>>        UserGroupInformation ugi =
> >>> UserGroupInformation.createProxyUser("<remote user name>",
> >>> UserGroupInformation.getLoginUser());
> >>>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
> >>>            public Void run() throws Exception {
> >>>
> >>>                JobConf conf = new JobConf(WordCount.class);
> >>>                conf.setJobName("wordcount");
> >>>
> >>>                conf.setOutputKeyClass(Text.class);
> >>>                conf.setOutputValueClass(IntWritable.class);
> >>>
> >>>                conf.setMapperClass(Map.class);
> >>>                conf.setCombinerClass(Reduce.class);
> >>>                conf.setReducerClass(Reduce.class);
> >>>
> >>>                conf.setInputFormat(TextInputFormat.class);
> >>>                conf.setOutputFormat(TextOutputFormat.class);
> >>>
> >>>                FileInputFormat.setInputPaths(conf, new Path("<path to
> input dir>"));
> >>>                FileOutputFormat.setOutputPath(conf, new Path("<path to
> >>> output dir>"));
> >>>
> >>>                conf.set("mapred.job.tracker", "<ip:8021>");
> >>>
> >>>                FileSystem fs = FileSystem.get(new
> URI("hdfs://<ip>:8020"),
> >>> new Configuration());
> >>>                fs.mkdirs(new Path("<remote path>"));
> >>>                fs.copyFromLocalFile(new Path("<local path>/test.jar"),
> new
> >>> Path("<remote path>"));
> >>>
> >>>
> >>
> >
>

Re: Issue with DistributedCache

Posted by Michel Segel <mi...@hotmail.com>.
Denis...

Sorry, you lost me.

Just to make sure we're using the same terminology...
The cluster is comprised of two types of nodes...
The data nodes which run DN,TT, and if you have HBase, RS.
Then there are control nodes which run you NN,SN, JT and if you run HBase, HM and ZKs ...

Outside of the cluster we have machines set up with Hadoop installed but are not running any of the processes. They are where our users launch there jobs. We call them edge nodes. ( it's not a good idea to let users directly on the actual cluster.)

Ok, having said all of that... You launch you job from the edge nodes... Your data sits in HDFS so you don't need distributed cache at all. Does that make sense?
You job will run on the local machine, connect to the JT and then run.

We set up the edge nodes so that all of the jars, config files are already set up for the users and we can better control access...

Sent from a remote device. Please excuse any typos...

Mike Segel

On Nov 24, 2011, at 7:22 AM, Denis Kreis <de...@gmail.com> wrote:

> Without using the distributed cache i'm getting the same error. It's
> because i start the job from a remote client / programmatically
> 
> 2011/11/24 Michel Segel <mi...@hotmail.com>:
>> Silly question... Why do you need to use the distributed cache for the word count program?
>>  What are you trying to accomplish?
>> 
>> I've only had to play with it for one project where we had to push out a bunch of c++ code to the nodes as part of a job...
>> 
>> Sent from a remote device. Please excuse any typos...
>> 
>> Mike Segel
>> 
>> On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com> wrote:
>> 
>>> Hi Bejoy
>>> 
>>> 1. Old API:
>>> The Map and Reduce classes are the same as in the example, the main
>>> method is as follows
>>> 
>>> public static void main(String[] args) throws IOException,
>>> InterruptedException {
>>>        UserGroupInformation ugi =
>>> UserGroupInformation.createProxyUser("<remote user name>",
>>> UserGroupInformation.getLoginUser());
>>>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
>>>            public Void run() throws Exception {
>>> 
>>>                JobConf conf = new JobConf(WordCount.class);
>>>                conf.setJobName("wordcount");
>>> 
>>>                conf.setOutputKeyClass(Text.class);
>>>                conf.setOutputValueClass(IntWritable.class);
>>> 
>>>                conf.setMapperClass(Map.class);
>>>                conf.setCombinerClass(Reduce.class);
>>>                conf.setReducerClass(Reduce.class);
>>> 
>>>                conf.setInputFormat(TextInputFormat.class);
>>>                conf.setOutputFormat(TextOutputFormat.class);
>>> 
>>>                FileInputFormat.setInputPaths(conf, new Path("<path to input dir>"));
>>>                FileOutputFormat.setOutputPath(conf, new Path("<path to
>>> output dir>"));
>>> 
>>>                conf.set("mapred.job.tracker", "<ip:8021>");
>>> 
>>>                FileSystem fs = FileSystem.get(new URI("hdfs://<ip>:8020"),
>>> new Configuration());
>>>                fs.mkdirs(new Path("<remote path>"));
>>>                fs.copyFromLocalFile(new Path("<local path>/test.jar"), new
>>> Path("<remote path>"));
>>> 
>>> 
>> 
> 

Re: Issue with DistributedCache

Posted by Denis Kreis <de...@gmail.com>.
Without using the distributed cache i'm getting the same error. It's
because i start the job from a remote client / programmatically

2011/11/24 Michel Segel <mi...@hotmail.com>:
> Silly question... Why do you need to use the distributed cache for the word count program?
>  What are you trying to accomplish?
>
> I've only had to play with it for one project where we had to push out a bunch of c++ code to the nodes as part of a job...
>
> Sent from a remote device. Please excuse any typos...
>
> Mike Segel
>
> On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com> wrote:
>
>> Hi Bejoy
>>
>> 1. Old API:
>> The Map and Reduce classes are the same as in the example, the main
>> method is as follows
>>
>> public static void main(String[] args) throws IOException,
>> InterruptedException {
>>        UserGroupInformation ugi =
>> UserGroupInformation.createProxyUser("<remote user name>",
>> UserGroupInformation.getLoginUser());
>>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
>>            public Void run() throws Exception {
>>
>>                JobConf conf = new JobConf(WordCount.class);
>>                conf.setJobName("wordcount");
>>
>>                conf.setOutputKeyClass(Text.class);
>>                conf.setOutputValueClass(IntWritable.class);
>>
>>                conf.setMapperClass(Map.class);
>>                conf.setCombinerClass(Reduce.class);
>>                conf.setReducerClass(Reduce.class);
>>
>>                conf.setInputFormat(TextInputFormat.class);
>>                conf.setOutputFormat(TextOutputFormat.class);
>>
>>                FileInputFormat.setInputPaths(conf, new Path("<path to input dir>"));
>>                FileOutputFormat.setOutputPath(conf, new Path("<path to
>> output dir>"));
>>
>>                conf.set("mapred.job.tracker", "<ip:8021>");
>>
>>                FileSystem fs = FileSystem.get(new URI("hdfs://<ip>:8020"),
>> new Configuration());
>>                fs.mkdirs(new Path("<remote path>"));
>>                fs.copyFromLocalFile(new Path("<local path>/test.jar"), new
>> Path("<remote path>"));
>>
>>
>

Re: Issue with DistributedCache

Posted by Michel Segel <mi...@hotmail.com>.
Silly question... Why do you need to use the distributed cache for the word count program?
 What are you trying to accomplish?

I've only had to play with it for one project where we had to push out a bunch of c++ code to the nodes as part of a job...

Sent from a remote device. Please excuse any typos...

Mike Segel

On Nov 24, 2011, at 7:05 AM, Denis Kreis <de...@gmail.com> wrote:

> Hi Bejoy
> 
> 1. Old API:
> The Map and Reduce classes are the same as in the example, the main
> method is as follows
> 
> public static void main(String[] args) throws IOException,
> InterruptedException {
>        UserGroupInformation ugi =
> UserGroupInformation.createProxyUser("<remote user name>",
> UserGroupInformation.getLoginUser());
>        ugi.doAs(new PrivilegedExceptionAction<Void>() {
>            public Void run() throws Exception {
>                
>                JobConf conf = new JobConf(WordCount.class);
>                conf.setJobName("wordcount");
>                
>                conf.setOutputKeyClass(Text.class);
>                conf.setOutputValueClass(IntWritable.class);
>                
>                conf.setMapperClass(Map.class);
>                conf.setCombinerClass(Reduce.class);
>                conf.setReducerClass(Reduce.class);
>            
>                conf.setInputFormat(TextInputFormat.class);
>                conf.setOutputFormat(TextOutputFormat.class);
> 
>                FileInputFormat.setInputPaths(conf, new Path("<path to input dir>"));
>                FileOutputFormat.setOutputPath(conf, new Path("<path to
> output dir>"));
>                
>                conf.set("mapred.job.tracker", "<ip:8021>");
>                
>                FileSystem fs = FileSystem.get(new URI("hdfs://<ip>:8020"),
> new Configuration());
>                fs.mkdirs(new Path("<remote path>"));
>                fs.copyFromLocalFile(new Path("<local path>/test.jar"), new
> Path("<remote path>"));
>                
> 

Re: Issue with DistributedCache

Posted by Denis Kreis <de...@gmail.com>.
Hi Bejoy

1. Old API:
The Map and Reduce classes are the same as in the example, the main
method is as follows

public static void main(String[] args) throws IOException,
InterruptedException {
		UserGroupInformation ugi =
UserGroupInformation.createProxyUser("<remote user name>",
UserGroupInformation.getLoginUser());
		ugi.doAs(new PrivilegedExceptionAction<Void>() {
            public Void run() throws Exception {
            	
            	JobConf conf = new JobConf(WordCount.class);
        		conf.setJobName("wordcount");
        		
        		conf.setOutputKeyClass(Text.class);
        		conf.setOutputValueClass(IntWritable.class);
        		
        		conf.setMapperClass(Map.class);
        		conf.setCombinerClass(Reduce.class);
        		conf.setReducerClass(Reduce.class);
        	
        		conf.setInputFormat(TextInputFormat.class);
        		conf.setOutputFormat(TextOutputFormat.class);

        		FileInputFormat.setInputPaths(conf, new Path("<path to input dir>"));
        		FileOutputFormat.setOutputPath(conf, new Path("<path to
output dir>"));
        		
        		conf.set("mapred.job.tracker", "<ip:8021>");
        		
        		FileSystem fs = FileSystem.get(new URI("hdfs://<ip>:8020"),
new Configuration());
        		fs.mkdirs(new Path("<remote path>"));
        		fs.copyFromLocalFile(new Path("<local path>/test.jar"), new
Path("<remote path>"));
        		
        		DistributedCache.addArchiveToClassPath(new Path("<remote
path>/test.jar"), conf, fs);
        		
        		JobClient.runJob(conf);
            	
            	return null;
            }
          });
	}
It works fine

2. New API:

public class WordCountNewAPI {
	
	public static class WordCountMapper extends Mapper<LongWritable,
Text, Text, IntWritable> {
		
		private final static IntWritable ONE = new IntWritable(1);
		private Text word = new Text();

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				context.write(word, ONE);
			}
			
			super.map(key, value, context);
		}
		
	}
	
	public static class WordCountReducer extends Reducer<Text,
IntWritable, Text, IntWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			
			int sum = 0;
			Iterator<IntWritable> iter = values.iterator();
			while (iter.hasNext()) {
				sum += iter.next().get();
			}
			
			context.write(key, new IntWritable(sum));
			
			super.reduce(key, values, context);
		}
		
	}
	
	/**
	 * @param args
	 * @throws IOException
	 * @throws InterruptedException
	 */
	public static void main(String[] args) throws IOException,
InterruptedException {
		UserGroupInformation ugi =
UserGroupInformation.createProxyUser("<remote user name>",
UserGroupInformation.getLoginUser());
		ugi.doAs(new PrivilegedExceptionAction<Void>() {
            public Void run() throws Exception {
            	
            	Configuration conf = new Configuration();
            	conf.set("mapred.job.tracker", "<ip:8021>");
            	
            	Job job = new Job(conf, "wordcount");
            	
            	job.setJarByClass(WordCountNewAPI.class);
        		
            	job.setOutputKeyClass(Text.class);
            	job.setOutputValueClass(IntWritable.class);
            	
            	job.setMapperClass(WordCountMapper.class);
            	job.setCombinerClass(WordCountReducer.class);
            	job.setReducerClass(WordCountReducer.class);
            	
            	job.setInputFormatClass(TextInputFormat.class);
            	job.setOutputFormatClass(TextOutputFormat.class);
            	
            	FileInputFormat.setInputPaths(job, new Path("<path to
input dir>"));
            	FileOutputFormat.setOutputPath(job, new Path("<path to
output dir>"));
        		
        		FileSystem fs = FileSystem.get(new URI("hdfs://<ip>:8020"),
new Configuration());
        		fs.mkdirs(new Path("<remote path>"));
        		fs.copyFromLocalFile(new Path("<local path>/test.jar"), new
Path("<remote path>"));
        		
        		DistributedCache.addArchiveToClassPath(new Path("<remote
path>/test.jar"), conf, fs);
        		
        		boolean b = job.waitForCompletion(true);
    			if (!b) {
    				throw new IOException("error with job!");
    			}
            	
            	return null;
            }
          });
	}

}

2011/11/24 Bejoy Ks <be...@gmail.com>:
> Hi Denis
>       Unfortunately the mailing lists strips off attachments, So it'd be
> great if you could paste the source in some location and share the url of
> the same. If the source is small enough then please include the same in
> subject body.
>
> For a quick comparison,  Try comparing your code with the following sample.
> I just scribbled it long back and it was working
> http://kickstarthadoop.blogspot.com/2011/05/word-count-example-with-hadoop-020.html
>
> Hope it helps!..
>
> Regards
> Bejoy.K.S
>
> On Thu, Nov 24, 2011 at 4:20 PM, Denis Kreis <de...@gmail.com> wrote:
>
>> Hi
>>
>> I' trying to modify the word count example
>> (http://wiki.apache.org/hadoop/WordCount) using the new api
>> (org.apache.hadoop.mapreduce.*). I run the job on a remote
>> pseudo-distributed cluster. It works fine with the old api, but when I
>> using the new one, i'm getting this:
>>
>>
>> 11/11/24 11:28:02 INFO mapred.JobClient: Task Id :
>> attempt_201111241046_0005_m_000000_0, Status : FAILED
>> java.lang.RuntimeException: java.lang.ClassNotFoundException:
>> WordCountNewAPI$WordCountMapper
>>        at
>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:866)
>>        at
>> org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
>>        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
>>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>>        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>>        at java.security.AccessController.doPrivileged(Native Method)
>>        at javax.security.auth.Subject.doAs(Subject.java:396)
>>        at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>>        at org.apache.hadoop.mapred.Child.main(Child.java:249)
>> Caused by: java.lang.ClassNotFoundException:
>> WordCountNewAPI$WordCountMapper
>>        at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>>        at java.security.AccessController.doPrivileged(Native Method)
>>        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>>        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>>        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>>        at java.lang.Class.forName0(Native Method)
>>        at java.lang.Class.forName(Class.java:247)
>>        at
>> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:819)
>>        at
>> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:864)
>>        ... 8 more
>>
>> The sources are in the attachment
>>
>> Regards
>> Denis
>>
>

Re: Issue with DistributedCache

Posted by Bejoy Ks <be...@gmail.com>.
Hi Denis
       Unfortunately the mailing lists strips off attachments, So it'd be
great if you could paste the source in some location and share the url of
the same. If the source is small enough then please include the same in
subject body.

For a quick comparison,  Try comparing your code with the following sample.
I just scribbled it long back and it was working
http://kickstarthadoop.blogspot.com/2011/05/word-count-example-with-hadoop-020.html

Hope it helps!..

Regards
Bejoy.K.S

On Thu, Nov 24, 2011 at 4:20 PM, Denis Kreis <de...@gmail.com> wrote:

> Hi
>
> I' trying to modify the word count example
> (http://wiki.apache.org/hadoop/WordCount) using the new api
> (org.apache.hadoop.mapreduce.*). I run the job on a remote
> pseudo-distributed cluster. It works fine with the old api, but when I
> using the new one, i'm getting this:
>
>
> 11/11/24 11:28:02 INFO mapred.JobClient: Task Id :
> attempt_201111241046_0005_m_000000_0, Status : FAILED
> java.lang.RuntimeException: java.lang.ClassNotFoundException:
> WordCountNewAPI$WordCountMapper
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:866)
>        at
> org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:199)
>        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:719)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
>        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>        at org.apache.hadoop.mapred.Child.main(Child.java:249)
> Caused by: java.lang.ClassNotFoundException:
> WordCountNewAPI$WordCountMapper
>        at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
>        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>        at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
>        at java.lang.Class.forName0(Native Method)
>        at java.lang.Class.forName(Class.java:247)
>        at
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:819)
>        at
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:864)
>        ... 8 more
>
> The sources are in the attachment
>
> Regards
> Denis
>