You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Michael Forage <Mi...@livenation.co.uk> on 2013/01/11 16:30:49 UTC

Compile error using contrib.utils.join package with new mapreduce API

Hi

I'm using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems compiling a simple class to implement a reduce-side data join of 2 files.
I'm trying to do this using contrib.utils.join and in Eclipse it all compiles fine other than:

job.setMapperClass(MapClass.class);
      job.setReducerClass(Reduce.class);

...which both complain that the referenced class no longer extends either Mapper<> or Reducer<>
It's my understanding that for what they should instead extend DataJoinMapperBase and DataJoinReducerBase in order

Have searched for a solution everywhere  but unfortunately, all the examples I can find are based on the deprecated mapred API.
Assuming this package actually works with the new API, can anyone offer any advice?

Complete compile errors:

The method setMapperClass(Class<? extends Mapper>) in the type Job is not applicable for the arguments (Class<DataJoin.MapClass>)
The method setReducerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<DataJoin.Reduce>)

...and the code...

package JoinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

public class DataJoin extends Configured implements Tool {

      public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }


    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;

        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "DataJoin");
            job.setJarByClass(DataJoin.class);

            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job,  in);
            FileOutputFormat.setOutputPath(job,  out);


job.setJobName("DataJoin");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);

            //V3 set to Text
            job.setOutputFormatClass(TextOutputFormat.class);

            //Applies to mapper output
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //job.set("mapred.textoutputformat.separator", ",");

            System.exit(job.waitForCompletion(true)?0:1);

            return 0;

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new DataJoin(),
                                 args);

        System.exit(res);
    }
}



Thanks

Mike


Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Mahesh Balija <ba...@gmail.com>.
Hi Mike,

            As I can see that DataJoinMapper/ReducerBase are implementing
the Mapper and Reducer interfaces from the MapRed package. And as you are
creating the job with latest API you are getting these compilation errors.

            You should search for the DataJoinMapper/ReducerBase are
available in the latest API or not.
            Or else you should rewrite your job in old passion using
jobconf.

Best,
Mahesh Balija,
Calsoft Labs.


On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
On the dev mailing list, Harsh pointed out that there is another join
related package:
http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/

This seems to be available in 2.x and trunk. Could you check if this
provides functionality you require - so we at least know there is new API
support in later versions ?

Thanks
Hemanth


On Mon, Jan 14, 2013 at 7:45 PM, Hemanth Yamijala <yhemanth@thoughtworks.com
> wrote:

> Hi,
>
> No. I didn't find any reference to a working sample. I also didn't find
> any JIRA that asks for a migration of this package to the new API. Not sure
> why. I have asked on the dev list.
>
> Thanks
> hemanth
>
>
> On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:
>
>>  Thanks Hemanth****
>>
>> ** **
>>
>> I appreciate your response****
>>
>> Did you find any working example of it in use? It looks to me like I’d
>> still be tied to the old API****
>>
>> Thanks****
>>
>> Mike****
>>
>> ** **
>>
>> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
>> *Sent:* 14 January 2013 05:08
>> *To:* user@hadoop.apache.org
>> *Subject:* Re: Compile error using contrib.utils.join package with new
>> mapreduce API****
>>
>> ** **
>>
>> Hi,****
>>
>> ** **
>>
>> The datajoin package has a class called DataJoinJob (
>> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
>> )****
>>
>> ** **
>>
>> I think using this will help you get around the issue you are facing.****
>>
>> ** **
>>
>> From the source, this is the command line usage of the class:****
>>
>> ** **
>>
>> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
>> mapper_class reducer_class map_output_value_class output_value_class
>> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>>
>> ** **
>>
>> Internally the class uses the old API to set the mapper and reducer
>> passed as arguments above.****
>>
>> ** **
>>
>> Thanks****
>>
>> hemanth****
>>
>> ** **
>>
>> ** **
>>
>> ** **
>>
>> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
>> Michael.Forage@livenation.co.uk> wrote:****
>>
>> Hi****
>>
>>  ****
>>
>> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
>> compiling a simple class to implement a reduce-side data join of 2 files.
>> ****
>>
>> I’m trying to do this using contrib.utils.join and in Eclipse it all
>> compiles fine other than:****
>>
>>  ****
>>
>> job.*setMapperClass*(MapClass.*class*);****
>>
>>       job.*setReducerClass*(Reduce.*class*);****
>>
>>  ****
>>
>> …which both complain that the referenced class no longer extends either
>> Mapper<> or Reducer<>****
>>
>> It’s my understanding that for what they should instead extend DataJoinMapperBase
>> and DataJoinReducerBase in order ****
>>
>>  ****
>>
>> Have searched for a solution everywhere  but unfortunately, all the
>> examples I can find are based on the deprecated mapred API.****
>>
>> Assuming this package actually works with the new API, can anyone offer
>> any advice?****
>>
>>  ****
>>
>> Complete compile errors:****
>>
>>  ****
>>
>> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
>> applicable for the arguments (Class<DataJoin.MapClass>)****
>>
>> The method setReducerClass(Class<? extends Reducer>) in the type Job is
>> not applicable for the arguments (Class<DataJoin.Reduce>)****
>>
>>  ****
>>
>> …and the code…****
>>
>>  ****
>>
>> *package* JoinTest;****
>>
>>  ****
>>
>> *import* java.io.DataInput;****
>>
>> *import* java.io.DataOutput;****
>>
>> *import* java.io.IOException;****
>>
>> *import* java.util.Iterator;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.conf.Configuration;****
>>
>> *import* org.apache.hadoop.conf.Configured;****
>>
>> *import* org.apache.hadoop.fs.Path;****
>>
>> *import* org.apache.hadoop.io.LongWritable;****
>>
>> *import* org.apache.hadoop.io.Text;****
>>
>> *import* org.apache.hadoop.io.Writable;****
>>
>> *import* org.apache.hadoop.mapreduce.Job;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper;****
>>
>> *import* org.apache.hadoop.mapreduce.Reducer;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>>
>> *import* org.apache.hadoop.util.Tool;****
>>
>> *import* org.apache.hadoop.util.ToolRunner;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>>
>>  ****
>>
>> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>>
>>     ****
>>
>>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {**
>> **
>>
>>         ****
>>
>>         *protected* Text generateInputTag(String inputFile) {****
>>
>>             String datasource = inputFile.split("-")[0];****
>>
>>             *return* *new* Text(datasource);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>>
>>             String line = ((Text) aRecord.getData()).toString();****
>>
>>             String[] tokens = line.split(",");****
>>
>>             String groupKey = tokens[0];****
>>
>>             *return* *new* Text(groupKey);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput generateTaggedMapOutput(Object
>> value) {****
>>
>>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>>
>>             retv.setTag(*this*.inputTag);****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>  ****
>>
>>       ****
>>
>>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
>> values) {****
>>
>>             *if* (tags.length < 2) *return* *null*;  ****
>>
>>             String joinedStr = ""; ****
>>
>>             *for* (*int* i=0; i<values.length; i++) {****
>>
>>                 *if* (i > 0) joinedStr += ",";****
>>
>>                 TaggedWritable tw = (TaggedWritable) values[i];****
>>
>>                 String line = ((Text) tw.getData()).toString();****
>>
>>                 String[] tokens = line.split(",", 2);****
>>
>>                 joinedStr += tokens[1];****
>>
>>             }****
>>
>>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
>> ****
>>
>>             retv.setTag((Text) tags[0]); ****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {*
>> ***
>>
>>     ****
>>
>>         *private* Writable data;****
>>
>>         ****
>>
>>         *public* TaggedWritable(Writable data) {****
>>
>>             *this*.tag = *new* Text("");****
>>
>>             *this*.data = data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* Writable getData() {****
>>
>>             *return* data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* write(DataOutput out) *throws* IOException {****
>>
>>             *this*.tag.write(out);****
>>
>>             *this*.data.write(out);****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* readFields(DataInput in) *throws* IOException {**
>> **
>>
>>             *this*.tag.readFields(in);****
>>
>>             *this*.data.readFields(in);****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *int* run(String[] args) *throws* Exception {****
>>
>>         Configuration conf = getConf();****
>>
>>         ****
>>
>>         Job job = *new* Job(conf, "DataJoin");****
>>
>>             job.setJarByClass(DataJoin.*class*);****
>>
>>             ****
>>
>>             Path in = * new* Path(args[0]);****
>>
>>             Path out = * new* Path(args[1]);****
>>
>>             FileInputFormat.*setInputPaths*(job,  in);****
>>
>>             FileOutputFormat.*setOutputPath*(job,  out);****
>>
>>             ****
>>
>>             ****
>>
>> job.setJobName("DataJoin");****
>>
>>             job.*setMapperClass*(MapClass.*class*);****
>>
>>             job.*setReducerClass*(Reduce.*class*);****
>>
>>                         ****
>>
>>             job.setInputFormatClass(TextInputFormat.*class*);****
>>
>>             ****
>>
>>             //V3 set to Text****
>>
>>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>>
>>             ****
>>
>>             //Applies to *mapper* output****
>>
>>             job.setOutputKeyClass(Text.*class*);****
>>
>>             job.setOutputValueClass(Text.*class*);****
>>
>>       ****
>>
>>             //job.set("mapred.textoutputformat.separator", ",");****
>>
>>             ****
>>
>>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>>
>>             ****
>>
>>             *return* 0;        ****
>>
>>  ****
>>
>>     }****
>>
>>     *public* *static* *void* main(String[] args) *throws* Exception { ***
>> *
>>
>>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>>
>>                                  *new* DataJoin(),****
>>
>>                                  args);****
>>
>>         ****
>>
>>         System.*exit*(res);****
>>
>>     }****
>>
>> }****
>>
>>  ****
>>
>>  ****
>>
>>  ****
>>
>> Thanks****
>>
>>  ****
>>
>> Mike****
>>
>>  ****
>>
>> ** **
>>
>
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
On the dev mailing list, Harsh pointed out that there is another join
related package:
http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/

This seems to be available in 2.x and trunk. Could you check if this
provides functionality you require - so we at least know there is new API
support in later versions ?

Thanks
Hemanth


On Mon, Jan 14, 2013 at 7:45 PM, Hemanth Yamijala <yhemanth@thoughtworks.com
> wrote:

> Hi,
>
> No. I didn't find any reference to a working sample. I also didn't find
> any JIRA that asks for a migration of this package to the new API. Not sure
> why. I have asked on the dev list.
>
> Thanks
> hemanth
>
>
> On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:
>
>>  Thanks Hemanth****
>>
>> ** **
>>
>> I appreciate your response****
>>
>> Did you find any working example of it in use? It looks to me like I’d
>> still be tied to the old API****
>>
>> Thanks****
>>
>> Mike****
>>
>> ** **
>>
>> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
>> *Sent:* 14 January 2013 05:08
>> *To:* user@hadoop.apache.org
>> *Subject:* Re: Compile error using contrib.utils.join package with new
>> mapreduce API****
>>
>> ** **
>>
>> Hi,****
>>
>> ** **
>>
>> The datajoin package has a class called DataJoinJob (
>> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
>> )****
>>
>> ** **
>>
>> I think using this will help you get around the issue you are facing.****
>>
>> ** **
>>
>> From the source, this is the command line usage of the class:****
>>
>> ** **
>>
>> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
>> mapper_class reducer_class map_output_value_class output_value_class
>> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>>
>> ** **
>>
>> Internally the class uses the old API to set the mapper and reducer
>> passed as arguments above.****
>>
>> ** **
>>
>> Thanks****
>>
>> hemanth****
>>
>> ** **
>>
>> ** **
>>
>> ** **
>>
>> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
>> Michael.Forage@livenation.co.uk> wrote:****
>>
>> Hi****
>>
>>  ****
>>
>> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
>> compiling a simple class to implement a reduce-side data join of 2 files.
>> ****
>>
>> I’m trying to do this using contrib.utils.join and in Eclipse it all
>> compiles fine other than:****
>>
>>  ****
>>
>> job.*setMapperClass*(MapClass.*class*);****
>>
>>       job.*setReducerClass*(Reduce.*class*);****
>>
>>  ****
>>
>> …which both complain that the referenced class no longer extends either
>> Mapper<> or Reducer<>****
>>
>> It’s my understanding that for what they should instead extend DataJoinMapperBase
>> and DataJoinReducerBase in order ****
>>
>>  ****
>>
>> Have searched for a solution everywhere  but unfortunately, all the
>> examples I can find are based on the deprecated mapred API.****
>>
>> Assuming this package actually works with the new API, can anyone offer
>> any advice?****
>>
>>  ****
>>
>> Complete compile errors:****
>>
>>  ****
>>
>> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
>> applicable for the arguments (Class<DataJoin.MapClass>)****
>>
>> The method setReducerClass(Class<? extends Reducer>) in the type Job is
>> not applicable for the arguments (Class<DataJoin.Reduce>)****
>>
>>  ****
>>
>> …and the code…****
>>
>>  ****
>>
>> *package* JoinTest;****
>>
>>  ****
>>
>> *import* java.io.DataInput;****
>>
>> *import* java.io.DataOutput;****
>>
>> *import* java.io.IOException;****
>>
>> *import* java.util.Iterator;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.conf.Configuration;****
>>
>> *import* org.apache.hadoop.conf.Configured;****
>>
>> *import* org.apache.hadoop.fs.Path;****
>>
>> *import* org.apache.hadoop.io.LongWritable;****
>>
>> *import* org.apache.hadoop.io.Text;****
>>
>> *import* org.apache.hadoop.io.Writable;****
>>
>> *import* org.apache.hadoop.mapreduce.Job;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper;****
>>
>> *import* org.apache.hadoop.mapreduce.Reducer;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>>
>> *import* org.apache.hadoop.util.Tool;****
>>
>> *import* org.apache.hadoop.util.ToolRunner;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>>
>>  ****
>>
>> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>>
>>     ****
>>
>>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {**
>> **
>>
>>         ****
>>
>>         *protected* Text generateInputTag(String inputFile) {****
>>
>>             String datasource = inputFile.split("-")[0];****
>>
>>             *return* *new* Text(datasource);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>>
>>             String line = ((Text) aRecord.getData()).toString();****
>>
>>             String[] tokens = line.split(",");****
>>
>>             String groupKey = tokens[0];****
>>
>>             *return* *new* Text(groupKey);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput generateTaggedMapOutput(Object
>> value) {****
>>
>>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>>
>>             retv.setTag(*this*.inputTag);****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>  ****
>>
>>       ****
>>
>>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
>> values) {****
>>
>>             *if* (tags.length < 2) *return* *null*;  ****
>>
>>             String joinedStr = ""; ****
>>
>>             *for* (*int* i=0; i<values.length; i++) {****
>>
>>                 *if* (i > 0) joinedStr += ",";****
>>
>>                 TaggedWritable tw = (TaggedWritable) values[i];****
>>
>>                 String line = ((Text) tw.getData()).toString();****
>>
>>                 String[] tokens = line.split(",", 2);****
>>
>>                 joinedStr += tokens[1];****
>>
>>             }****
>>
>>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
>> ****
>>
>>             retv.setTag((Text) tags[0]); ****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {*
>> ***
>>
>>     ****
>>
>>         *private* Writable data;****
>>
>>         ****
>>
>>         *public* TaggedWritable(Writable data) {****
>>
>>             *this*.tag = *new* Text("");****
>>
>>             *this*.data = data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* Writable getData() {****
>>
>>             *return* data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* write(DataOutput out) *throws* IOException {****
>>
>>             *this*.tag.write(out);****
>>
>>             *this*.data.write(out);****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* readFields(DataInput in) *throws* IOException {**
>> **
>>
>>             *this*.tag.readFields(in);****
>>
>>             *this*.data.readFields(in);****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *int* run(String[] args) *throws* Exception {****
>>
>>         Configuration conf = getConf();****
>>
>>         ****
>>
>>         Job job = *new* Job(conf, "DataJoin");****
>>
>>             job.setJarByClass(DataJoin.*class*);****
>>
>>             ****
>>
>>             Path in = * new* Path(args[0]);****
>>
>>             Path out = * new* Path(args[1]);****
>>
>>             FileInputFormat.*setInputPaths*(job,  in);****
>>
>>             FileOutputFormat.*setOutputPath*(job,  out);****
>>
>>             ****
>>
>>             ****
>>
>> job.setJobName("DataJoin");****
>>
>>             job.*setMapperClass*(MapClass.*class*);****
>>
>>             job.*setReducerClass*(Reduce.*class*);****
>>
>>                         ****
>>
>>             job.setInputFormatClass(TextInputFormat.*class*);****
>>
>>             ****
>>
>>             //V3 set to Text****
>>
>>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>>
>>             ****
>>
>>             //Applies to *mapper* output****
>>
>>             job.setOutputKeyClass(Text.*class*);****
>>
>>             job.setOutputValueClass(Text.*class*);****
>>
>>       ****
>>
>>             //job.set("mapred.textoutputformat.separator", ",");****
>>
>>             ****
>>
>>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>>
>>             ****
>>
>>             *return* 0;        ****
>>
>>  ****
>>
>>     }****
>>
>>     *public* *static* *void* main(String[] args) *throws* Exception { ***
>> *
>>
>>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>>
>>                                  *new* DataJoin(),****
>>
>>                                  args);****
>>
>>         ****
>>
>>         System.*exit*(res);****
>>
>>     }****
>>
>> }****
>>
>>  ****
>>
>>  ****
>>
>>  ****
>>
>> Thanks****
>>
>>  ****
>>
>> Mike****
>>
>>  ****
>>
>> ** **
>>
>
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
On the dev mailing list, Harsh pointed out that there is another join
related package:
http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/

This seems to be available in 2.x and trunk. Could you check if this
provides functionality you require - so we at least know there is new API
support in later versions ?

Thanks
Hemanth


On Mon, Jan 14, 2013 at 7:45 PM, Hemanth Yamijala <yhemanth@thoughtworks.com
> wrote:

> Hi,
>
> No. I didn't find any reference to a working sample. I also didn't find
> any JIRA that asks for a migration of this package to the new API. Not sure
> why. I have asked on the dev list.
>
> Thanks
> hemanth
>
>
> On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:
>
>>  Thanks Hemanth****
>>
>> ** **
>>
>> I appreciate your response****
>>
>> Did you find any working example of it in use? It looks to me like I’d
>> still be tied to the old API****
>>
>> Thanks****
>>
>> Mike****
>>
>> ** **
>>
>> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
>> *Sent:* 14 January 2013 05:08
>> *To:* user@hadoop.apache.org
>> *Subject:* Re: Compile error using contrib.utils.join package with new
>> mapreduce API****
>>
>> ** **
>>
>> Hi,****
>>
>> ** **
>>
>> The datajoin package has a class called DataJoinJob (
>> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
>> )****
>>
>> ** **
>>
>> I think using this will help you get around the issue you are facing.****
>>
>> ** **
>>
>> From the source, this is the command line usage of the class:****
>>
>> ** **
>>
>> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
>> mapper_class reducer_class map_output_value_class output_value_class
>> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>>
>> ** **
>>
>> Internally the class uses the old API to set the mapper and reducer
>> passed as arguments above.****
>>
>> ** **
>>
>> Thanks****
>>
>> hemanth****
>>
>> ** **
>>
>> ** **
>>
>> ** **
>>
>> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
>> Michael.Forage@livenation.co.uk> wrote:****
>>
>> Hi****
>>
>>  ****
>>
>> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
>> compiling a simple class to implement a reduce-side data join of 2 files.
>> ****
>>
>> I’m trying to do this using contrib.utils.join and in Eclipse it all
>> compiles fine other than:****
>>
>>  ****
>>
>> job.*setMapperClass*(MapClass.*class*);****
>>
>>       job.*setReducerClass*(Reduce.*class*);****
>>
>>  ****
>>
>> …which both complain that the referenced class no longer extends either
>> Mapper<> or Reducer<>****
>>
>> It’s my understanding that for what they should instead extend DataJoinMapperBase
>> and DataJoinReducerBase in order ****
>>
>>  ****
>>
>> Have searched for a solution everywhere  but unfortunately, all the
>> examples I can find are based on the deprecated mapred API.****
>>
>> Assuming this package actually works with the new API, can anyone offer
>> any advice?****
>>
>>  ****
>>
>> Complete compile errors:****
>>
>>  ****
>>
>> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
>> applicable for the arguments (Class<DataJoin.MapClass>)****
>>
>> The method setReducerClass(Class<? extends Reducer>) in the type Job is
>> not applicable for the arguments (Class<DataJoin.Reduce>)****
>>
>>  ****
>>
>> …and the code…****
>>
>>  ****
>>
>> *package* JoinTest;****
>>
>>  ****
>>
>> *import* java.io.DataInput;****
>>
>> *import* java.io.DataOutput;****
>>
>> *import* java.io.IOException;****
>>
>> *import* java.util.Iterator;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.conf.Configuration;****
>>
>> *import* org.apache.hadoop.conf.Configured;****
>>
>> *import* org.apache.hadoop.fs.Path;****
>>
>> *import* org.apache.hadoop.io.LongWritable;****
>>
>> *import* org.apache.hadoop.io.Text;****
>>
>> *import* org.apache.hadoop.io.Writable;****
>>
>> *import* org.apache.hadoop.mapreduce.Job;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper;****
>>
>> *import* org.apache.hadoop.mapreduce.Reducer;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>>
>> *import* org.apache.hadoop.util.Tool;****
>>
>> *import* org.apache.hadoop.util.ToolRunner;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>>
>>  ****
>>
>> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>>
>>     ****
>>
>>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {**
>> **
>>
>>         ****
>>
>>         *protected* Text generateInputTag(String inputFile) {****
>>
>>             String datasource = inputFile.split("-")[0];****
>>
>>             *return* *new* Text(datasource);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>>
>>             String line = ((Text) aRecord.getData()).toString();****
>>
>>             String[] tokens = line.split(",");****
>>
>>             String groupKey = tokens[0];****
>>
>>             *return* *new* Text(groupKey);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput generateTaggedMapOutput(Object
>> value) {****
>>
>>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>>
>>             retv.setTag(*this*.inputTag);****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>  ****
>>
>>       ****
>>
>>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
>> values) {****
>>
>>             *if* (tags.length < 2) *return* *null*;  ****
>>
>>             String joinedStr = ""; ****
>>
>>             *for* (*int* i=0; i<values.length; i++) {****
>>
>>                 *if* (i > 0) joinedStr += ",";****
>>
>>                 TaggedWritable tw = (TaggedWritable) values[i];****
>>
>>                 String line = ((Text) tw.getData()).toString();****
>>
>>                 String[] tokens = line.split(",", 2);****
>>
>>                 joinedStr += tokens[1];****
>>
>>             }****
>>
>>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
>> ****
>>
>>             retv.setTag((Text) tags[0]); ****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {*
>> ***
>>
>>     ****
>>
>>         *private* Writable data;****
>>
>>         ****
>>
>>         *public* TaggedWritable(Writable data) {****
>>
>>             *this*.tag = *new* Text("");****
>>
>>             *this*.data = data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* Writable getData() {****
>>
>>             *return* data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* write(DataOutput out) *throws* IOException {****
>>
>>             *this*.tag.write(out);****
>>
>>             *this*.data.write(out);****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* readFields(DataInput in) *throws* IOException {**
>> **
>>
>>             *this*.tag.readFields(in);****
>>
>>             *this*.data.readFields(in);****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *int* run(String[] args) *throws* Exception {****
>>
>>         Configuration conf = getConf();****
>>
>>         ****
>>
>>         Job job = *new* Job(conf, "DataJoin");****
>>
>>             job.setJarByClass(DataJoin.*class*);****
>>
>>             ****
>>
>>             Path in = * new* Path(args[0]);****
>>
>>             Path out = * new* Path(args[1]);****
>>
>>             FileInputFormat.*setInputPaths*(job,  in);****
>>
>>             FileOutputFormat.*setOutputPath*(job,  out);****
>>
>>             ****
>>
>>             ****
>>
>> job.setJobName("DataJoin");****
>>
>>             job.*setMapperClass*(MapClass.*class*);****
>>
>>             job.*setReducerClass*(Reduce.*class*);****
>>
>>                         ****
>>
>>             job.setInputFormatClass(TextInputFormat.*class*);****
>>
>>             ****
>>
>>             //V3 set to Text****
>>
>>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>>
>>             ****
>>
>>             //Applies to *mapper* output****
>>
>>             job.setOutputKeyClass(Text.*class*);****
>>
>>             job.setOutputValueClass(Text.*class*);****
>>
>>       ****
>>
>>             //job.set("mapred.textoutputformat.separator", ",");****
>>
>>             ****
>>
>>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>>
>>             ****
>>
>>             *return* 0;        ****
>>
>>  ****
>>
>>     }****
>>
>>     *public* *static* *void* main(String[] args) *throws* Exception { ***
>> *
>>
>>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>>
>>                                  *new* DataJoin(),****
>>
>>                                  args);****
>>
>>         ****
>>
>>         System.*exit*(res);****
>>
>>     }****
>>
>> }****
>>
>>  ****
>>
>>  ****
>>
>>  ****
>>
>> Thanks****
>>
>>  ****
>>
>> Mike****
>>
>>  ****
>>
>> ** **
>>
>
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
On the dev mailing list, Harsh pointed out that there is another join
related package:
http://svn.apache.org/repos/asf/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/join/

This seems to be available in 2.x and trunk. Could you check if this
provides functionality you require - so we at least know there is new API
support in later versions ?

Thanks
Hemanth


On Mon, Jan 14, 2013 at 7:45 PM, Hemanth Yamijala <yhemanth@thoughtworks.com
> wrote:

> Hi,
>
> No. I didn't find any reference to a working sample. I also didn't find
> any JIRA that asks for a migration of this package to the new API. Not sure
> why. I have asked on the dev list.
>
> Thanks
> hemanth
>
>
> On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:
>
>>  Thanks Hemanth****
>>
>> ** **
>>
>> I appreciate your response****
>>
>> Did you find any working example of it in use? It looks to me like I’d
>> still be tied to the old API****
>>
>> Thanks****
>>
>> Mike****
>>
>> ** **
>>
>> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
>> *Sent:* 14 January 2013 05:08
>> *To:* user@hadoop.apache.org
>> *Subject:* Re: Compile error using contrib.utils.join package with new
>> mapreduce API****
>>
>> ** **
>>
>> Hi,****
>>
>> ** **
>>
>> The datajoin package has a class called DataJoinJob (
>> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
>> )****
>>
>> ** **
>>
>> I think using this will help you get around the issue you are facing.****
>>
>> ** **
>>
>> From the source, this is the command line usage of the class:****
>>
>> ** **
>>
>> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
>> mapper_class reducer_class map_output_value_class output_value_class
>> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>>
>> ** **
>>
>> Internally the class uses the old API to set the mapper and reducer
>> passed as arguments above.****
>>
>> ** **
>>
>> Thanks****
>>
>> hemanth****
>>
>> ** **
>>
>> ** **
>>
>> ** **
>>
>> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
>> Michael.Forage@livenation.co.uk> wrote:****
>>
>> Hi****
>>
>>  ****
>>
>> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
>> compiling a simple class to implement a reduce-side data join of 2 files.
>> ****
>>
>> I’m trying to do this using contrib.utils.join and in Eclipse it all
>> compiles fine other than:****
>>
>>  ****
>>
>> job.*setMapperClass*(MapClass.*class*);****
>>
>>       job.*setReducerClass*(Reduce.*class*);****
>>
>>  ****
>>
>> …which both complain that the referenced class no longer extends either
>> Mapper<> or Reducer<>****
>>
>> It’s my understanding that for what they should instead extend DataJoinMapperBase
>> and DataJoinReducerBase in order ****
>>
>>  ****
>>
>> Have searched for a solution everywhere  but unfortunately, all the
>> examples I can find are based on the deprecated mapred API.****
>>
>> Assuming this package actually works with the new API, can anyone offer
>> any advice?****
>>
>>  ****
>>
>> Complete compile errors:****
>>
>>  ****
>>
>> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
>> applicable for the arguments (Class<DataJoin.MapClass>)****
>>
>> The method setReducerClass(Class<? extends Reducer>) in the type Job is
>> not applicable for the arguments (Class<DataJoin.Reduce>)****
>>
>>  ****
>>
>> …and the code…****
>>
>>  ****
>>
>> *package* JoinTest;****
>>
>>  ****
>>
>> *import* java.io.DataInput;****
>>
>> *import* java.io.DataOutput;****
>>
>> *import* java.io.IOException;****
>>
>> *import* java.util.Iterator;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.conf.Configuration;****
>>
>> *import* org.apache.hadoop.conf.Configured;****
>>
>> *import* org.apache.hadoop.fs.Path;****
>>
>> *import* org.apache.hadoop.io.LongWritable;****
>>
>> *import* org.apache.hadoop.io.Text;****
>>
>> *import* org.apache.hadoop.io.Writable;****
>>
>> *import* org.apache.hadoop.mapreduce.Job;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper;****
>>
>> *import* org.apache.hadoop.mapreduce.Reducer;****
>>
>> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>>
>> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>>
>> *import* org.apache.hadoop.util.Tool;****
>>
>> *import* org.apache.hadoop.util.ToolRunner;****
>>
>>  ****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>>
>> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>>
>>  ****
>>
>> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>>
>>     ****
>>
>>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {**
>> **
>>
>>         ****
>>
>>         *protected* Text generateInputTag(String inputFile) {****
>>
>>             String datasource = inputFile.split("-")[0];****
>>
>>             *return* *new* Text(datasource);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>>
>>             String line = ((Text) aRecord.getData()).toString();****
>>
>>             String[] tokens = line.split(",");****
>>
>>             String groupKey = tokens[0];****
>>
>>             *return* *new* Text(groupKey);****
>>
>>         }****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput generateTaggedMapOutput(Object
>> value) {****
>>
>>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>>
>>             retv.setTag(*this*.inputTag);****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>  ****
>>
>>       ****
>>
>>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>>
>>         ****
>>
>>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
>> values) {****
>>
>>             *if* (tags.length < 2) *return* *null*;  ****
>>
>>             String joinedStr = ""; ****
>>
>>             *for* (*int* i=0; i<values.length; i++) {****
>>
>>                 *if* (i > 0) joinedStr += ",";****
>>
>>                 TaggedWritable tw = (TaggedWritable) values[i];****
>>
>>                 String line = ((Text) tw.getData()).toString();****
>>
>>                 String[] tokens = line.split(",", 2);****
>>
>>                 joinedStr += tokens[1];****
>>
>>             }****
>>
>>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
>> ****
>>
>>             retv.setTag((Text) tags[0]); ****
>>
>>             *return* retv;****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {*
>> ***
>>
>>     ****
>>
>>         *private* Writable data;****
>>
>>         ****
>>
>>         *public* TaggedWritable(Writable data) {****
>>
>>             *this*.tag = *new* Text("");****
>>
>>             *this*.data = data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* Writable getData() {****
>>
>>             *return* data;****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* write(DataOutput out) *throws* IOException {****
>>
>>             *this*.tag.write(out);****
>>
>>             *this*.data.write(out);****
>>
>>         }****
>>
>>         ****
>>
>>         *public* *void* readFields(DataInput in) *throws* IOException {**
>> **
>>
>>             *this*.tag.readFields(in);****
>>
>>             *this*.data.readFields(in);****
>>
>>         }****
>>
>>     }****
>>
>>     ****
>>
>>     *public* *int* run(String[] args) *throws* Exception {****
>>
>>         Configuration conf = getConf();****
>>
>>         ****
>>
>>         Job job = *new* Job(conf, "DataJoin");****
>>
>>             job.setJarByClass(DataJoin.*class*);****
>>
>>             ****
>>
>>             Path in = * new* Path(args[0]);****
>>
>>             Path out = * new* Path(args[1]);****
>>
>>             FileInputFormat.*setInputPaths*(job,  in);****
>>
>>             FileOutputFormat.*setOutputPath*(job,  out);****
>>
>>             ****
>>
>>             ****
>>
>> job.setJobName("DataJoin");****
>>
>>             job.*setMapperClass*(MapClass.*class*);****
>>
>>             job.*setReducerClass*(Reduce.*class*);****
>>
>>                         ****
>>
>>             job.setInputFormatClass(TextInputFormat.*class*);****
>>
>>             ****
>>
>>             //V3 set to Text****
>>
>>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>>
>>             ****
>>
>>             //Applies to *mapper* output****
>>
>>             job.setOutputKeyClass(Text.*class*);****
>>
>>             job.setOutputValueClass(Text.*class*);****
>>
>>       ****
>>
>>             //job.set("mapred.textoutputformat.separator", ",");****
>>
>>             ****
>>
>>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>>
>>             ****
>>
>>             *return* 0;        ****
>>
>>  ****
>>
>>     }****
>>
>>     *public* *static* *void* main(String[] args) *throws* Exception { ***
>> *
>>
>>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>>
>>                                  *new* DataJoin(),****
>>
>>                                  args);****
>>
>>         ****
>>
>>         System.*exit*(res);****
>>
>>     }****
>>
>> }****
>>
>>  ****
>>
>>  ****
>>
>>  ****
>>
>> Thanks****
>>
>>  ****
>>
>> Mike****
>>
>>  ****
>>
>> ** **
>>
>
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

No. I didn't find any reference to a working sample. I also didn't find any
JIRA that asks for a migration of this package to the new API. Not sure
why. I have asked on the dev list.

Thanks
hemanth


On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Thanks Hemanth****
>
> ** **
>
> I appreciate your response****
>
> Did you find any working example of it in use? It looks to me like I’d
> still be tied to the old API****
>
> Thanks****
>
> Mike****
>
> ** **
>
> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
> *Sent:* 14 January 2013 05:08
> *To:* user@hadoop.apache.org
> *Subject:* Re: Compile error using contrib.utils.join package with new
> mapreduce API****
>
> ** **
>
> Hi,****
>
> ** **
>
> The datajoin package has a class called DataJoinJob (
> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
> )****
>
> ** **
>
> I think using this will help you get around the issue you are facing.****
>
> ** **
>
> From the source, this is the command line usage of the class:****
>
> ** **
>
> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
> mapper_class reducer_class map_output_value_class output_value_class
> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>
> ** **
>
> Internally the class uses the old API to set the mapper and reducer passed
> as arguments above.****
>
> ** **
>
> Thanks****
>
> hemanth****
>
> ** **
>
> ** **
>
> ** **
>
> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:****
>
> Hi****
>
>  ****
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
>  ****
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
>  ****
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
>  ****
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
>  ****
>
> Complete compile errors:****
>
>  ****
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
>  ****
>
> …and the code…****
>
>  ****
>
> *package* JoinTest;****
>
>  ****
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
>  ****
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
>  ****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
>  ****
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>  ****
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = * new* Path(args[0]);****
>
>             Path out = * new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
>  ****
>
>     }****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
>  ****
>
>  ****
>
>  ****
>
> Thanks****
>
>  ****
>
> Mike****
>
>  ****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

No. I didn't find any reference to a working sample. I also didn't find any
JIRA that asks for a migration of this package to the new API. Not sure
why. I have asked on the dev list.

Thanks
hemanth


On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Thanks Hemanth****
>
> ** **
>
> I appreciate your response****
>
> Did you find any working example of it in use? It looks to me like I’d
> still be tied to the old API****
>
> Thanks****
>
> Mike****
>
> ** **
>
> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
> *Sent:* 14 January 2013 05:08
> *To:* user@hadoop.apache.org
> *Subject:* Re: Compile error using contrib.utils.join package with new
> mapreduce API****
>
> ** **
>
> Hi,****
>
> ** **
>
> The datajoin package has a class called DataJoinJob (
> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
> )****
>
> ** **
>
> I think using this will help you get around the issue you are facing.****
>
> ** **
>
> From the source, this is the command line usage of the class:****
>
> ** **
>
> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
> mapper_class reducer_class map_output_value_class output_value_class
> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>
> ** **
>
> Internally the class uses the old API to set the mapper and reducer passed
> as arguments above.****
>
> ** **
>
> Thanks****
>
> hemanth****
>
> ** **
>
> ** **
>
> ** **
>
> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:****
>
> Hi****
>
>  ****
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
>  ****
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
>  ****
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
>  ****
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
>  ****
>
> Complete compile errors:****
>
>  ****
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
>  ****
>
> …and the code…****
>
>  ****
>
> *package* JoinTest;****
>
>  ****
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
>  ****
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
>  ****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
>  ****
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>  ****
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = * new* Path(args[0]);****
>
>             Path out = * new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
>  ****
>
>     }****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
>  ****
>
>  ****
>
>  ****
>
> Thanks****
>
>  ****
>
> Mike****
>
>  ****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

No. I didn't find any reference to a working sample. I also didn't find any
JIRA that asks for a migration of this package to the new API. Not sure
why. I have asked on the dev list.

Thanks
hemanth


On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Thanks Hemanth****
>
> ** **
>
> I appreciate your response****
>
> Did you find any working example of it in use? It looks to me like I’d
> still be tied to the old API****
>
> Thanks****
>
> Mike****
>
> ** **
>
> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
> *Sent:* 14 January 2013 05:08
> *To:* user@hadoop.apache.org
> *Subject:* Re: Compile error using contrib.utils.join package with new
> mapreduce API****
>
> ** **
>
> Hi,****
>
> ** **
>
> The datajoin package has a class called DataJoinJob (
> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
> )****
>
> ** **
>
> I think using this will help you get around the issue you are facing.****
>
> ** **
>
> From the source, this is the command line usage of the class:****
>
> ** **
>
> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
> mapper_class reducer_class map_output_value_class output_value_class
> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>
> ** **
>
> Internally the class uses the old API to set the mapper and reducer passed
> as arguments above.****
>
> ** **
>
> Thanks****
>
> hemanth****
>
> ** **
>
> ** **
>
> ** **
>
> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:****
>
> Hi****
>
>  ****
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
>  ****
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
>  ****
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
>  ****
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
>  ****
>
> Complete compile errors:****
>
>  ****
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
>  ****
>
> …and the code…****
>
>  ****
>
> *package* JoinTest;****
>
>  ****
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
>  ****
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
>  ****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
>  ****
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>  ****
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = * new* Path(args[0]);****
>
>             Path out = * new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
>  ****
>
>     }****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
>  ****
>
>  ****
>
>  ****
>
> Thanks****
>
>  ****
>
> Mike****
>
>  ****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

No. I didn't find any reference to a working sample. I also didn't find any
JIRA that asks for a migration of this package to the new API. Not sure
why. I have asked on the dev list.

Thanks
hemanth


On Mon, Jan 14, 2013 at 6:25 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Thanks Hemanth****
>
> ** **
>
> I appreciate your response****
>
> Did you find any working example of it in use? It looks to me like I’d
> still be tied to the old API****
>
> Thanks****
>
> Mike****
>
> ** **
>
> *From:* Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
> *Sent:* 14 January 2013 05:08
> *To:* user@hadoop.apache.org
> *Subject:* Re: Compile error using contrib.utils.join package with new
> mapreduce API****
>
> ** **
>
> Hi,****
>
> ** **
>
> The datajoin package has a class called DataJoinJob (
> http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
> )****
>
> ** **
>
> I think using this will help you get around the issue you are facing.****
>
> ** **
>
> From the source, this is the command line usage of the class:****
>
> ** **
>
> usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
> mapper_class reducer_class map_output_value_class output_value_class
> [maxNumOfValuesPerGroup [descriptionOfJob]]]****
>
> ** **
>
> Internally the class uses the old API to set the mapper and reducer passed
> as arguments above.****
>
> ** **
>
> Thanks****
>
> hemanth****
>
> ** **
>
> ** **
>
> ** **
>
> On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
> Michael.Forage@livenation.co.uk> wrote:****
>
> Hi****
>
>  ****
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
>  ****
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
>  ****
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
>  ****
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
>  ****
>
> Complete compile errors:****
>
>  ****
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
>  ****
>
> …and the code…****
>
>  ****
>
> *package* JoinTest;****
>
>  ****
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
>  ****
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
>  ****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
>  ****
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>  ****
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = * new* Path(args[0]);****
>
>             Path out = * new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
>  ****
>
>     }****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
>  ****
>
>  ****
>
>  ****
>
> Thanks****
>
>  ****
>
> Mike****
>
>  ****
>
> ** **
>

RE: Compile error using contrib.utils.join package with new mapreduce API

Posted by Michael Forage <Mi...@livenation.co.uk>.
Thanks Hemanth

I appreciate your response
Did you find any working example of it in use? It looks to me like I'd still be tied to the old API
Thanks
Mike

From: Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
Sent: 14 January 2013 05:08
To: user@hadoop.apache.org
Subject: Re: Compile error using contrib.utils.join package with new mapreduce API

Hi,

The datajoin package has a class called DataJoinJob (http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts mapper_class reducer_class map_output_value_class output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed as arguments above.

Thanks
hemanth



On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <Mi...@livenation.co.uk>> wrote:
Hi

I'm using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems compiling a simple class to implement a reduce-side data join of 2 files.
I'm trying to do this using contrib.utils.join and in Eclipse it all compiles fine other than:

job.setMapperClass(MapClass.class);
      job.setReducerClass(Reduce.class);

...which both complain that the referenced class no longer extends either Mapper<> or Reducer<>
It's my understanding that for what they should instead extend DataJoinMapperBase and DataJoinReducerBase in order

Have searched for a solution everywhere  but unfortunately, all the examples I can find are based on the deprecated mapred API.
Assuming this package actually works with the new API, can anyone offer any advice?

Complete compile errors:

The method setMapperClass(Class<? extends Mapper>) in the type Job is not applicable for the arguments (Class<DataJoin.MapClass>)
The method setReducerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<DataJoin.Reduce>)

...and the code...

package JoinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

public class DataJoin extends Configured implements Tool {

      public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }


    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;

        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "DataJoin");
            job.setJarByClass(DataJoin.class);

            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job,  in);
            FileOutputFormat.setOutputPath(job,  out);


job.setJobName("DataJoin");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);

            //V3 set to Text
            job.setOutputFormatClass(TextOutputFormat.class);

            //Applies to mapper output
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //job.set("mapred.textoutputformat.separator", ",");

            System.exit(job.waitForCompletion(true)?0:1);

            return 0;

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new DataJoin(),
                                 args);

        System.exit(res);
    }
}



Thanks

Mike



RE: Compile error using contrib.utils.join package with new mapreduce API

Posted by Michael Forage <Mi...@livenation.co.uk>.
Thanks Hemanth

I appreciate your response
Did you find any working example of it in use? It looks to me like I'd still be tied to the old API
Thanks
Mike

From: Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
Sent: 14 January 2013 05:08
To: user@hadoop.apache.org
Subject: Re: Compile error using contrib.utils.join package with new mapreduce API

Hi,

The datajoin package has a class called DataJoinJob (http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts mapper_class reducer_class map_output_value_class output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed as arguments above.

Thanks
hemanth



On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <Mi...@livenation.co.uk>> wrote:
Hi

I'm using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems compiling a simple class to implement a reduce-side data join of 2 files.
I'm trying to do this using contrib.utils.join and in Eclipse it all compiles fine other than:

job.setMapperClass(MapClass.class);
      job.setReducerClass(Reduce.class);

...which both complain that the referenced class no longer extends either Mapper<> or Reducer<>
It's my understanding that for what they should instead extend DataJoinMapperBase and DataJoinReducerBase in order

Have searched for a solution everywhere  but unfortunately, all the examples I can find are based on the deprecated mapred API.
Assuming this package actually works with the new API, can anyone offer any advice?

Complete compile errors:

The method setMapperClass(Class<? extends Mapper>) in the type Job is not applicable for the arguments (Class<DataJoin.MapClass>)
The method setReducerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<DataJoin.Reduce>)

...and the code...

package JoinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

public class DataJoin extends Configured implements Tool {

      public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }


    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;

        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "DataJoin");
            job.setJarByClass(DataJoin.class);

            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job,  in);
            FileOutputFormat.setOutputPath(job,  out);


job.setJobName("DataJoin");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);

            //V3 set to Text
            job.setOutputFormatClass(TextOutputFormat.class);

            //Applies to mapper output
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //job.set("mapred.textoutputformat.separator", ",");

            System.exit(job.waitForCompletion(true)?0:1);

            return 0;

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new DataJoin(),
                                 args);

        System.exit(res);
    }
}



Thanks

Mike



RE: Compile error using contrib.utils.join package with new mapreduce API

Posted by Michael Forage <Mi...@livenation.co.uk>.
Thanks Hemanth

I appreciate your response
Did you find any working example of it in use? It looks to me like I'd still be tied to the old API
Thanks
Mike

From: Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
Sent: 14 January 2013 05:08
To: user@hadoop.apache.org
Subject: Re: Compile error using contrib.utils.join package with new mapreduce API

Hi,

The datajoin package has a class called DataJoinJob (http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts mapper_class reducer_class map_output_value_class output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed as arguments above.

Thanks
hemanth



On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <Mi...@livenation.co.uk>> wrote:
Hi

I'm using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems compiling a simple class to implement a reduce-side data join of 2 files.
I'm trying to do this using contrib.utils.join and in Eclipse it all compiles fine other than:

job.setMapperClass(MapClass.class);
      job.setReducerClass(Reduce.class);

...which both complain that the referenced class no longer extends either Mapper<> or Reducer<>
It's my understanding that for what they should instead extend DataJoinMapperBase and DataJoinReducerBase in order

Have searched for a solution everywhere  but unfortunately, all the examples I can find are based on the deprecated mapred API.
Assuming this package actually works with the new API, can anyone offer any advice?

Complete compile errors:

The method setMapperClass(Class<? extends Mapper>) in the type Job is not applicable for the arguments (Class<DataJoin.MapClass>)
The method setReducerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<DataJoin.Reduce>)

...and the code...

package JoinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

public class DataJoin extends Configured implements Tool {

      public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }


    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;

        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "DataJoin");
            job.setJarByClass(DataJoin.class);

            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job,  in);
            FileOutputFormat.setOutputPath(job,  out);


job.setJobName("DataJoin");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);

            //V3 set to Text
            job.setOutputFormatClass(TextOutputFormat.class);

            //Applies to mapper output
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //job.set("mapred.textoutputformat.separator", ",");

            System.exit(job.waitForCompletion(true)?0:1);

            return 0;

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new DataJoin(),
                                 args);

        System.exit(res);
    }
}



Thanks

Mike



RE: Compile error using contrib.utils.join package with new mapreduce API

Posted by Michael Forage <Mi...@livenation.co.uk>.
Thanks Hemanth

I appreciate your response
Did you find any working example of it in use? It looks to me like I'd still be tied to the old API
Thanks
Mike

From: Hemanth Yamijala [mailto:yhemanth@thoughtworks.com]
Sent: 14 January 2013 05:08
To: user@hadoop.apache.org
Subject: Re: Compile error using contrib.utils.join package with new mapreduce API

Hi,

The datajoin package has a class called DataJoinJob (http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts mapper_class reducer_class map_output_value_class output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed as arguments above.

Thanks
hemanth



On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <Mi...@livenation.co.uk>> wrote:
Hi

I'm using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems compiling a simple class to implement a reduce-side data join of 2 files.
I'm trying to do this using contrib.utils.join and in Eclipse it all compiles fine other than:

job.setMapperClass(MapClass.class);
      job.setReducerClass(Reduce.class);

...which both complain that the referenced class no longer extends either Mapper<> or Reducer<>
It's my understanding that for what they should instead extend DataJoinMapperBase and DataJoinReducerBase in order

Have searched for a solution everywhere  but unfortunately, all the examples I can find are based on the deprecated mapred API.
Assuming this package actually works with the new API, can anyone offer any advice?

Complete compile errors:

The method setMapperClass(Class<? extends Mapper>) in the type Job is not applicable for the arguments (Class<DataJoin.MapClass>)
The method setReducerClass(Class<? extends Reducer>) in the type Job is not applicable for the arguments (Class<DataJoin.Reduce>)

...and the code...

package JoinTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;

public class DataJoin extends Configured implements Tool {

      public static class MapClass extends DataJoinMapperBase {

        protected Text generateInputTag(String inputFile) {
            String datasource = inputFile.split("-")[0];
            return new Text(datasource);
        }

        protected Text generateGroupKey(TaggedMapOutput aRecord) {
            String line = ((Text) aRecord.getData()).toString();
            String[] tokens = line.split(",");
            String groupKey = tokens[0];
            return new Text(groupKey);
        }

        protected TaggedMapOutput generateTaggedMapOutput(Object value) {
            TaggedWritable retv = new TaggedWritable((Text) value);
            retv.setTag(this.inputTag);
            return retv;
        }
    }


    public static class Reduce extends DataJoinReducerBase {

        protected TaggedMapOutput combine(Object[] tags, Object[] values) {
            if (tags.length < 2) return null;
            String joinedStr = "";
            for (int i=0; i<values.length; i++) {
                if (i > 0) joinedStr += ",";
                TaggedWritable tw = (TaggedWritable) values[i];
                String line = ((Text) tw.getData()).toString();
                String[] tokens = line.split(",", 2);
                joinedStr += tokens[1];
            }
            TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
            retv.setTag((Text) tags[0]);
            return retv;
        }
    }

    public static class TaggedWritable extends TaggedMapOutput {

        private Writable data;

        public TaggedWritable(Writable data) {
            this.tag = new Text("");
            this.data = data;
        }

        public Writable getData() {
            return data;
        }

        public void write(DataOutput out) throws IOException {
            this.tag.write(out);
            this.data.write(out);
        }

        public void readFields(DataInput in) throws IOException {
            this.tag.readFields(in);
            this.data.readFields(in);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        Job job = new Job(conf, "DataJoin");
            job.setJarByClass(DataJoin.class);

            Path in = new Path(args[0]);
            Path out = new Path(args[1]);
            FileInputFormat.setInputPaths(job,  in);
            FileOutputFormat.setOutputPath(job,  out);


job.setJobName("DataJoin");
            job.setMapperClass(MapClass.class);
            job.setReducerClass(Reduce.class);

            job.setInputFormatClass(TextInputFormat.class);

            //V3 set to Text
            job.setOutputFormatClass(TextOutputFormat.class);

            //Applies to mapper output
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);

            //job.set("mapred.textoutputformat.separator", ",");

            System.exit(job.waitForCompletion(true)?0:1);

            return 0;

    }
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),
                                 new DataJoin(),
                                 args);

        System.exit(res);
    }
}



Thanks

Mike



Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

The datajoin package has a class called DataJoinJob (
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
mapper_class reducer_class map_output_value_class output_value_class
[maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed
as arguments above.

Thanks
hemanth




On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Mahesh Balija <ba...@gmail.com>.
Hi Mike,

            As I can see that DataJoinMapper/ReducerBase are implementing
the Mapper and Reducer interfaces from the MapRed package. And as you are
creating the job with latest API you are getting these compilation errors.

            You should search for the DataJoinMapper/ReducerBase are
available in the latest API or not.
            Or else you should rewrite your job in old passion using
jobconf.

Best,
Mahesh Balija,
Calsoft Labs.


On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Mahesh Balija <ba...@gmail.com>.
Hi Mike,

            As I can see that DataJoinMapper/ReducerBase are implementing
the Mapper and Reducer interfaces from the MapRed package. And as you are
creating the job with latest API you are getting these compilation errors.

            You should search for the DataJoinMapper/ReducerBase are
available in the latest API or not.
            Or else you should rewrite your job in old passion using
jobconf.

Best,
Mahesh Balija,
Calsoft Labs.


On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

The datajoin package has a class called DataJoinJob (
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
mapper_class reducer_class map_output_value_class output_value_class
[maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed
as arguments above.

Thanks
hemanth




On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

The datajoin package has a class called DataJoinJob (
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
mapper_class reducer_class map_output_value_class output_value_class
[maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed
as arguments above.

Thanks
hemanth




On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Hemanth Yamijala <yh...@thoughtworks.com>.
Hi,

The datajoin package has a class called DataJoinJob (
http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/contrib/utils/join/DataJoinJob.html
)

I think using this will help you get around the issue you are facing.

>From the source, this is the command line usage of the class:

usage: DataJoinJob inputdirs outputdir map_input_file_format  numofParts
mapper_class reducer_class map_output_value_class output_value_class
[maxNumOfValuesPerGroup [descriptionOfJob]]]

Internally the class uses the old API to set the mapper and reducer passed
as arguments above.

Thanks
hemanth




On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>

Re: Compile error using contrib.utils.join package with new mapreduce API

Posted by Mahesh Balija <ba...@gmail.com>.
Hi Mike,

            As I can see that DataJoinMapper/ReducerBase are implementing
the Mapper and Reducer interfaces from the MapRed package. And as you are
creating the job with latest API you are getting these compilation errors.

            You should search for the DataJoinMapper/ReducerBase are
available in the latest API or not.
            Or else you should rewrite your job in old passion using
jobconf.

Best,
Mahesh Balija,
Calsoft Labs.


On Fri, Jan 11, 2013 at 9:00 PM, Michael Forage <
Michael.Forage@livenation.co.uk> wrote:

>  Hi****
>
> ** **
>
> I’m using Hadoop 1.0.4 and using the hadoop.mapreduce API having problems
> compiling a simple class to implement a reduce-side data join of 2 files.*
> ***
>
> I’m trying to do this using contrib.utils.join and in Eclipse it all
> compiles fine other than:****
>
> ** **
>
> job.*setMapperClass*(MapClass.*class*);****
>
>       job.*setReducerClass*(Reduce.*class*);****
>
> ** **
>
> …which both complain that the referenced class no longer extends either
> Mapper<> or Reducer<>****
>
> It’s my understanding that for what they should instead extend DataJoinMapperBase
> and DataJoinReducerBase in order ****
>
> ** **
>
> Have searched for a solution everywhere  but unfortunately, all the
> examples I can find are based on the deprecated mapred API.****
>
> Assuming this package actually works with the new API, can anyone offer
> any advice?****
>
> ** **
>
> Complete compile errors:****
>
> ** **
>
> The method setMapperClass(Class<? extends Mapper>) in the type Job is not
> applicable for the arguments (Class<DataJoin.MapClass>)****
>
> The method setReducerClass(Class<? extends Reducer>) in the type Job is
> not applicable for the arguments (Class<DataJoin.Reduce>)****
>
> ** **
>
> …and the code…****
>
> ** **
>
> *package* JoinTest;****
>
> ** **
>
> *import* java.io.DataInput;****
>
> *import* java.io.DataOutput;****
>
> *import* java.io.IOException;****
>
> *import* java.util.Iterator;****
>
> ** **
>
> *import* org.apache.hadoop.conf.Configuration;****
>
> *import* org.apache.hadoop.conf.Configured;****
>
> *import* org.apache.hadoop.fs.Path;****
>
> *import* org.apache.hadoop.io.LongWritable;****
>
> *import* org.apache.hadoop.io.Text;****
>
> *import* org.apache.hadoop.io.Writable;****
>
> *import* org.apache.hadoop.mapreduce.Job;****
>
> *import* org.apache.hadoop.mapreduce.Mapper;****
>
> *import* org.apache.hadoop.mapreduce.Reducer;****
>
> *import* org.apache.hadoop.mapreduce.Mapper.Context;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.FileInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.input.TextInputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;****
>
> *import* org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;****
>
> *import* org.apache.hadoop.util.Tool;****
>
> *import* org.apache.hadoop.util.ToolRunner;****
>
> ** **
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;****
>
> *import* org.apache.hadoop.contrib.utils.join.TaggedMapOutput;****
>
> ** **
>
> *public* *class* DataJoin *extends* Configured *implements* Tool {****
>
>     ****
>
>       *public* *static* *class* MapClass *extends* DataJoinMapperBase {***
> *
>
>         ****
>
>         *protected* Text generateInputTag(String inputFile) {****
>
>             String datasource = inputFile.split("-")[0];****
>
>             *return* *new* Text(datasource);****
>
>         }****
>
>         ****
>
>         *protected* Text generateGroupKey(TaggedMapOutput aRecord) {****
>
>             String line = ((Text) aRecord.getData()).toString();****
>
>             String[] tokens = line.split(",");****
>
>             String groupKey = tokens[0];****
>
>             *return* *new* Text(groupKey);****
>
>         }****
>
>         ****
>
>         *protected* TaggedMapOutput generateTaggedMapOutput(Object value)
> {****
>
>             TaggedWritable retv = *new* TaggedWritable((Text) value);****
>
>             retv.setTag(*this*.inputTag);****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
> ** **
>
>       ****
>
>     *public* *static* *class* Reduce *extends* DataJoinReducerBase {****
>
>         ****
>
>         *protected* TaggedMapOutput combine(Object[] tags, Object[]
> values) {****
>
>             *if* (tags.length < 2) *return* *null*;  ****
>
>             String joinedStr = ""; ****
>
>             *for* (*int* i=0; i<values.length; i++) {****
>
>                 *if* (i > 0) joinedStr += ",";****
>
>                 TaggedWritable tw = (TaggedWritable) values[i];****
>
>                 String line = ((Text) tw.getData()).toString();****
>
>                 String[] tokens = line.split(",", 2);****
>
>                 joinedStr += tokens[1];****
>
>             }****
>
>             TaggedWritable retv = *new* TaggedWritable(*new*Text(joinedStr));
> ****
>
>             retv.setTag((Text) tags[0]); ****
>
>             *return* retv;****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *static* *class* TaggedWritable *extends* TaggedMapOutput {**
> **
>
>     ****
>
>         *private* Writable data;****
>
>         ****
>
>         *public* TaggedWritable(Writable data) {****
>
>             *this*.tag = *new* Text("");****
>
>             *this*.data = data;****
>
>         }****
>
>         ****
>
>         *public* Writable getData() {****
>
>             *return* data;****
>
>         }****
>
>         ****
>
>         *public* *void* write(DataOutput out) *throws* IOException {****
>
>             *this*.tag.write(out);****
>
>             *this*.data.write(out);****
>
>         }****
>
>         ****
>
>         *public* *void* readFields(DataInput in) *throws* IOException {***
> *
>
>             *this*.tag.readFields(in);****
>
>             *this*.data.readFields(in);****
>
>         }****
>
>     }****
>
>     ****
>
>     *public* *int* run(String[] args) *throws* Exception {****
>
>         Configuration conf = getConf();****
>
>         ****
>
>         Job job = *new* Job(conf, "DataJoin");****
>
>             job.setJarByClass(DataJoin.*class*);****
>
>             ****
>
>             Path in = *new* Path(args[0]);****
>
>             Path out = *new* Path(args[1]);****
>
>             FileInputFormat.*setInputPaths*(job,  in);****
>
>             FileOutputFormat.*setOutputPath*(job,  out);****
>
>             ****
>
>             ****
>
> job.setJobName("DataJoin");****
>
>             job.*setMapperClass*(MapClass.*class*);****
>
>             job.*setReducerClass*(Reduce.*class*);****
>
>                         ****
>
>             job.setInputFormatClass(TextInputFormat.*class*);****
>
>             ****
>
>             //V3 set to Text****
>
>             job.setOutputFormatClass(TextOutputFormat.*class*);****
>
>             ****
>
>             //Applies to *mapper* output****
>
>             job.setOutputKeyClass(Text.*class*);****
>
>             job.setOutputValueClass(Text.*class*);****
>
>       ****
>
>             //job.set("mapred.textoutputformat.separator", ",");****
>
>             ****
>
>             System.*exit*(job.waitForCompletion(*true*)?0:1);****
>
>             ****
>
>             *return* 0;        ****
>
> ** **
>
>     }****
>
> ****
>
>     *public* *static* *void* main(String[] args) *throws* Exception { ****
>
>         *int* res = ToolRunner.*run*(*new* Configuration(),****
>
>                                  *new* DataJoin(),****
>
>                                  args);****
>
>         ****
>
>         System.*exit*(res);****
>
>     }****
>
> }****
>
> ** **
>
> ** **
>
> ** **
>
> Thanks****
>
> ** **
>
> Mike****
>
> ** **
>