You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by David Ginzburg <da...@gmail.com> on 2015/02/22 04:55:27 UTC
Avromultiple output
Hi,
I am trying to run an MR job on emr with AvromultipleOutput
I get the following exception when running with AMI with hadoop 2.2 2.5
Found interface org.apache.hadoop.mapreduce.JobContext, but class was
expected
I read it is related to incompatible hadoop versions, So I modified
When running with AMI with hadoop 103 I get the following exception:
java.lang.NullPointerException
at
org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
at org.apache.hadoop.mapred.Child.main(Child.java:253)
The driver code is
Job job = new Job(getConf(), "myad");
job.setOutputValueClass(NullWritable.class);
job.setJarByClass(myAdTextLineMapper.class);
Path inputOflineFiles = new Path(args[0]);
Path inputOfUbberFiles = new Path(args[1]);
FileInputFormat.setInputPaths(job, inputOflineFiles);
job.setMapperClass(myAdTextLineMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UberRecord.class);
job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
job.setReducerClass(myAdReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(UberRecord.class);
job.setNumReduceTasks(2);
String baseoutputFolder = args[2];
job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
baseoutputFolder);
;
LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
return job.waitForCompletion(true) ? 0 : 1;
the mapper and reducers
@Override
public void setup(Context ctx) {
ubp = new UberRecordProcessor();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
try {
handleLineinMap(value);
if(ub!=null){
context.write(new Text(ub.getAuctionId().toString()), ub);
context.getCounter("myAd",
"myAdTextLineMapper").increment(1);
}else{
context.getCounter("myAd",
"myAdTextLineMapperNull").increment(1);
}
} catch (Exception e) {
context.getCounter("myAd",
"myAdTextLineMapperError").increment(1);
logger.warn("could not parse line "+value.toString(),e);
}
}
public class myAdReducer extends
Reducer<Text, UberRecord, AvroKey<CharSequence>,
AvroValue<UberRecord>> {
private static Logger logger = Logger.getLogger(myAdReducer.class);
public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
UberRecordProcessor ubp = new UberRecordProcessor();
// "year=%s/month=%s/day=%s/hour=%s"
private String baseOutputPath;
private long reduceAttemptUniqueIdentifier = System.currentTimeMillis();
// 2015-02-01T18:00:25.673Z
static DateTimeFormatter dateformat = DateTimeFormat
.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
amos = new AvroMultipleOutputs(context);
baseOutputPath = context.getConfiguration().get(BASE_OUTPUT_FOLDER);
}
@Override
protected void reduce(Text key, Iterable<UberRecord> values, Context
context)
throws IOException, InterruptedException {
try {
UberRecord ub = new UberRecord();
for (UberRecord ubi : values) {
// enrich
if (ubi.getExchange() == null) {
continue;
}
BaseBidRequestEnricher enc = BaseBidRequestEnricher
.getEnricher(ubi.getExchange().toString());
enc.enrich(ubi);
ub = mergeUB(ub, ubi);
}
logger.info("Writing UberRecord [" + ub.toString() + "]");
String partition = getPartition(ub);
// context.write();
// AvroKey<CharSequence>, AvroValue<UberRecord>>
amos.write(new AvroKey<CharSequence>(key.toString()),
new AvroValue<UberRecord>(ub), baseOutputPath + "/"
+ partition + "/p" +
reduceAttemptUniqueIdentifier);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public UberRecord mergeUB(UberRecord dest, UberRecord src) {
List<Field> fields = UberRecord.getClassSchema().getFields();
List<Field> engFields = EngageData.getClassSchema().getFields();
for (Field field : fields) {
if (field.name().equals("engageData")
&& dest.getEngageData() != null) {
EngageData mergedEng = dest.getEngageData();
for (Field engField : engFields) {
if (dest.getEngageData().get(engField.name()) == null) {
mergedEng.put(engField.name(),
src.getEngageData().get(engField.name()));
}
}
dest.setEngageData(mergedEng);
} else {
if (dest.get(field.name()) == null) {
dest.put(field.name(), src.get(field.name()));
}
}
}
return dest;
}
Re: Avromultiple output
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Hadoop packaging changed from 1.x to 2.x. Ex: hadoop-core is no longer
valid in 2.x
Use these (hadoop1, hadoo2)
<profile>
<id>hadoop1</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.0.1.3.0.0-107</version>
<exclusions>
<exclusion>
<artifactId>javax.servlet</artifactId>
<groupId>servlet-api</groupId>
</exclusion>
<exclusion>
<artifactId>org.mortbay.jetty</artifactId>
<groupId>servlet-api</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.hadoop.compression</groupId>
<artifactId>hadoop-gpl-compression</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
<classifier>hadoop1</classifier>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<profile>
<id>hadoop2</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.hadoop.compression.lzo</groupId>
<artifactId>hadoop-lzo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.12.0.2.0.6.0-76</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
*<classifier>hadoop2</classifier>*
</dependency>
<dependencies>
*<dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-common</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
* <dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-hdfs</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
* <dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-mapreduce-client-jobclient</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
</dependencies>
</dependencies>
</dependencyManagement>
</profile>
On Tue, Feb 24, 2015 at 11:46 PM, Artem Ervits <ar...@gmail.com>
wrote:
> Hadoop-core is now hadoop-common
>
> Artem Ervits
> On Feb 24, 2015 12:57 PM, "David Ginzburg" <da...@gmail.com>
> wrote:
>
>> I am running with EMR
>> AMI version:3.3.1
>> Hadoop distribution:Amazon 2.4.0
>>
>> The hadoop jars are provided
>>
>>
>> <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
>>
>> <dependencies>
>>
>> <dependency>
>> <groupId>com.google.guava</groupId>
>> <artifactId>guava</artifactId>
>> <version>18.0</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.kafka</groupId>
>> <artifactId>kafka_2.10</artifactId>
>> <version>0.8.2-beta</version>
>> <scope>compile</scope>
>> <exclusions>
>> <exclusion>
>> <groupId>com.sun.jmx</groupId>
>> <artifactId>jmxri</artifactId>
>> </exclusion>
>> <exclusion>
>> <groupId>com.sun.jdmk</groupId>
>> <artifactId>jmxtools</artifactId>
>> </exclusion>
>> <exclusion>
>> <groupId>javax.jms</groupId>
>> <artifactId>jms</artifactId>
>> </exclusion>
>> <exclusion>
>> <artifactId>slf4j-api</artifactId>
>> <groupId>org.slf4j</groupId>
>> </exclusion>
>> <exclusion>
>> <artifactId>snappy-java</artifactId>
>> <groupId>org.xerial.snappy</groupId>
>> </exclusion>
>> </exclusions>
>> </dependency>
>> <dependency>
>> <groupId>com.maxmind.geoip2</groupId>
>> <artifactId>geoip2</artifactId>
>> <version>2.1.0</version>
>> </dependency>
>> <dependency>
>> <groupId>com.amazonaws</groupId>
>> <artifactId>aws-java-sdk</artifactId>
>> <version>1.9.3</version>
>> </dependency>
>> <dependency>
>> <groupId>com.google.code.gson</groupId>
>> <artifactId>gson</artifactId>
>> <version>1.6</version>
>> </dependency>
>> <dependency>
>> <groupId>log4j</groupId>
>> <artifactId>log4j</artifactId>
>> <version>1.2.17</version>
>> </dependency>
>> <dependency>
>> <groupId>joda-time</groupId>
>> <artifactId>joda-time</artifactId>
>> <version>2.6</version>
>> </dependency>
>> <dependency>
>> <groupId>org.mockito</groupId>
>> <artifactId>mockito-all</artifactId>
>> <version>1.9.5</version>
>> <scope>test</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.mrunit</groupId>
>> <artifactId>mrunit</artifactId>
>> <version>1.1.0</version>
>> <classifier>hadoop2</classifier>
>> <exclusions>
>> <exclusion>
>> <artifactId>guava</artifactId>
>> <groupId>com.google.guava</groupId>
>> </exclusion>
>> </exclusions>
>> </dependency>
>> <dependency>
>> <groupId>junit</groupId>
>> <artifactId>junit</artifactId>
>> <version>4.11</version>
>> <scope>test</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.hadoop</groupId>
>> <artifactId>hadoop-core</artifactId>
>> <version>${hadoop.version}</version>
>> <scope>provided</scope>
>> <exclusions>
>> <exclusion>
>> <artifactId>jets3t</artifactId>
>> <groupId>net.java.dev.jets3t</groupId>
>> </exclusion>
>> </exclusions>
>> </dependency>
>> <dependency>
>> <artifactId>jets3t</artifactId>
>> <groupId>net.java.dev.jets3t</groupId>
>> <version>0.9.2</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.hadoop</groupId>
>> <artifactId>hadoop-client</artifactId>
>> <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1
>> -->
>> <scope>provided</scope>
>> <exclusions>
>> <exclusion>
>> <artifactId>guava</artifactId>
>> <groupId>com.google.guava</groupId>
>> </exclusion>
>> <exclusion>
>> <artifactId>avro</artifactId>
>> <groupId>org.apache.avro</groupId>
>> </exclusion>
>> <exclusion>
>> <artifactId>slf4j-api</artifactId>
>> <groupId>org.slf4j</groupId>
>> </exclusion>
>> </exclusions>
>> </dependency>
>> <dependency>
>> <artifactId>avro-mapred</artifactId>
>> <groupId>org.apache.avro</groupId>
>> <version>1.7.7</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.mrunit</groupId>
>> <artifactId>mrunit</artifactId>
>> <version>1.0.0</version>
>> <classifier>hadoop1</classifier>
>> <scope>test</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.json</groupId>
>> <artifactId>org.json</artifactId>
>> <version>chargebee-1.0</version>
>> </dependency>
>> <dependency>
>> <groupId>com.hadoop.gplcompression</groupId>
>> <artifactId>hadoop-lzo</artifactId>
>> <version>0.4.19</version>
>> </dependency>
>> <dependency>
>> <groupId>net.sf.uadetector</groupId>
>> <artifactId>uadetector-resources</artifactId>
>> <version>2014.04</version>
>> <exclusions>
>> <exclusion>
>> <artifactId>slf4j-api</artifactId>
>> <groupId>org.slf4j</groupId>
>> </exclusion>
>> </exclusions>
>> </dependency>
>>
>> <dependency>
>> <groupId>commons-io</groupId>
>> <artifactId>commons-io</artifactId>
>> <version>2.4</version>
>> </dependency>
>>
>> <dependency>
>> <groupId>org.codehaus.jackson</groupId>
>> <artifactId>jackson-mapper-asl</artifactId>
>> <version>1.9.13</version>
>> </dependency>
>> <dependency>
>> <groupId>org.codehaus.jackson</groupId>
>> <artifactId>jackson-core-asl</artifactId>
>> <version>1.9.13</version>
>> </dependency>
>>
>> </dependencies>
>>
>>
>>
>> On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Share your avro dependencies(versions) in case your using maven and
>>> hadoop dependencies (version)
>>>
>>>
>>>
>>> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> Check your Hadoop version. In older version JobContext was interface
>>>> and in new one its class.
>>>>
>>>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <
>>>> davidginzburg@gmail.com> wrote:
>>>>
>>>>> Thank you for the answer.
>>>>>
>>>>> Tried but the exception
>>>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>>>> class was expected*
>>>>> persists
>>>>>
>>>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> try this
>>>>>>
>>>>>> Job job = Job.getInstance(conf);
>>>>>> Job.setName(name);
>>>>>>
>>>>>> Artem Ervits
>>>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I get the following exception when running with AMI with hadoop 2.2
>>>>>>> 2.5
>>>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class
>>>>>>> was expected
>>>>>>>
>>>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>>>
>>>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>> at
>>>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>>> at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>>> at
>>>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>>> at
>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>> at
>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>>>
>>>>>>>
>>>>>>> The driver code is
>>>>>>>
>>>>>>> Job job = new Job(getConf(), "myad");
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> job.setOutputValueClass(NullWritable.class);
>>>>>>>
>>>>>>>
>>>>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>>>>> Path inputOflineFiles = new Path(args[0]);
>>>>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>>>>
>>>>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>>>
>>>>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>>>>> job.setMapOutputKeyClass(Text.class);
>>>>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>>>>
>>>>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>>> AvroJob.setOutputKeySchema(job,
>>>>>>> Schema.create(Schema.Type.STRING));
>>>>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>>>
>>>>>>>
>>>>>>> job.setReducerClass(myAdReducer.class);
>>>>>>> job.setOutputKeyClass(Text.class);
>>>>>>> job.setOutputValueClass(UberRecord.class);
>>>>>>> job.setNumReduceTasks(2);
>>>>>>> String baseoutputFolder = args[2];
>>>>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>>> baseoutputFolder);
>>>>>>> ;
>>>>>>>
>>>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>>>
>>>>>>> FileOutputFormat.setOutputPath(job, new
>>>>>>> Path(baseoutputFolder));
>>>>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>>>>
>>>>>>>
>>>>>>> the mapper and reducers
>>>>>>> @Override
>>>>>>> public void setup(Context ctx) {
>>>>>>>
>>>>>>> ubp = new UberRecordProcessor();
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> protected void map(LongWritable key, Text value, Context context)
>>>>>>> throws IOException, InterruptedException {
>>>>>>> try {
>>>>>>> handleLineinMap(value);
>>>>>>> if(ub!=null){
>>>>>>> context.write(new
>>>>>>> Text(ub.getAuctionId().toString()), ub);
>>>>>>> context.getCounter("myAd",
>>>>>>> "myAdTextLineMapper").increment(1);
>>>>>>> }else{
>>>>>>> context.getCounter("myAd",
>>>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>>> }
>>>>>>> } catch (Exception e) {
>>>>>>> context.getCounter("myAd",
>>>>>>> "myAdTextLineMapperError").increment(1);
>>>>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>>>>
>>>>>>>
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> public class myAdReducer extends
>>>>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>>>> AvroValue<UberRecord>> {
>>>>>>>
>>>>>>> private static Logger logger =
>>>>>>> Logger.getLogger(myAdReducer.class);
>>>>>>> public static final String BASE_OUTPUT_FOLDER =
>>>>>>> "base.output.folder";
>>>>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>>>> outputs;
>>>>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>>>>> private String baseOutputPath;
>>>>>>> private long reduceAttemptUniqueIdentifier =
>>>>>>> System.currentTimeMillis();
>>>>>>>
>>>>>>> // 2015-02-01T18:00:25.673Z
>>>>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>>>
>>>>>>> @Override
>>>>>>> protected void setup(Context context) throws IOException,
>>>>>>> InterruptedException {
>>>>>>>
>>>>>>> amos = new AvroMultipleOutputs(context);
>>>>>>> baseOutputPath =
>>>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>>>>> Context context)
>>>>>>> throws IOException, InterruptedException {
>>>>>>>
>>>>>>> try {
>>>>>>> UberRecord ub = new UberRecord();
>>>>>>> for (UberRecord ubi : values) {
>>>>>>> // enrich
>>>>>>> if (ubi.getExchange() == null) {
>>>>>>> continue;
>>>>>>> }
>>>>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>>> .getEnricher(ubi.getExchange().toString());
>>>>>>> enc.enrich(ubi);
>>>>>>> ub = mergeUB(ub, ubi);
>>>>>>> }
>>>>>>> logger.info("Writing UberRecord [" + ub.toString() +
>>>>>>> "]");
>>>>>>> String partition = getPartition(ub);
>>>>>>>
>>>>>>> // context.write();
>>>>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>>> new AvroValue<UberRecord>(ub), baseOutputPath +
>>>>>>> "/"
>>>>>>> + partition + "/p" +
>>>>>>> reduceAttemptUniqueIdentifier);
>>>>>>> } catch (Exception e) {
>>>>>>> // TODO Auto-generated catch block
>>>>>>> e.printStackTrace();
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>>> List<Field> engFields =
>>>>>>> EngageData.getClassSchema().getFields();
>>>>>>> for (Field field : fields) {
>>>>>>> if (field.name().equals("engageData")
>>>>>>> && dest.getEngageData() != null) {
>>>>>>> EngageData mergedEng = dest.getEngageData();
>>>>>>> for (Field engField : engFields) {
>>>>>>> if (dest.getEngageData().get(engField.name()) ==
>>>>>>> null) {
>>>>>>> mergedEng.put(engField.name(),
>>>>>>>
>>>>>>> src.getEngageData().get(engField.name()));
>>>>>>> }
>>>>>>>
>>>>>>> }
>>>>>>> dest.setEngageData(mergedEng);
>>>>>>> } else {
>>>>>>> if (dest.get(field.name()) == null) {
>>>>>>> dest.put(field.name(), src.get(field.name()));
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> return dest;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
--
Deepak
Re: Avromultiple output
Posted by Artem Ervits <ar...@gmail.com>.
Hadoop-core is now hadoop-common
Artem Ervits
On Feb 24, 2015 12:57 PM, "David Ginzburg" <da...@gmail.com> wrote:
> I am running with EMR
> AMI version:3.3.1
> Hadoop distribution:Amazon 2.4.0
>
> The hadoop jars are provided
>
>
> <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
>
> <dependencies>
>
> <dependency>
> <groupId>com.google.guava</groupId>
> <artifactId>guava</artifactId>
> <version>18.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka_2.10</artifactId>
> <version>0.8.2-beta</version>
> <scope>compile</scope>
> <exclusions>
> <exclusion>
> <groupId>com.sun.jmx</groupId>
> <artifactId>jmxri</artifactId>
> </exclusion>
> <exclusion>
> <groupId>com.sun.jdmk</groupId>
> <artifactId>jmxtools</artifactId>
> </exclusion>
> <exclusion>
> <groupId>javax.jms</groupId>
> <artifactId>jms</artifactId>
> </exclusion>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> <exclusion>
> <artifactId>snappy-java</artifactId>
> <groupId>org.xerial.snappy</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>com.maxmind.geoip2</groupId>
> <artifactId>geoip2</artifactId>
> <version>2.1.0</version>
> </dependency>
> <dependency>
> <groupId>com.amazonaws</groupId>
> <artifactId>aws-java-sdk</artifactId>
> <version>1.9.3</version>
> </dependency>
> <dependency>
> <groupId>com.google.code.gson</groupId>
> <artifactId>gson</artifactId>
> <version>1.6</version>
> </dependency>
> <dependency>
> <groupId>log4j</groupId>
> <artifactId>log4j</artifactId>
> <version>1.2.17</version>
> </dependency>
> <dependency>
> <groupId>joda-time</groupId>
> <artifactId>joda-time</artifactId>
> <version>2.6</version>
> </dependency>
> <dependency>
> <groupId>org.mockito</groupId>
> <artifactId>mockito-all</artifactId>
> <version>1.9.5</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.mrunit</groupId>
> <artifactId>mrunit</artifactId>
> <version>1.1.0</version>
> <classifier>hadoop2</classifier>
> <exclusions>
> <exclusion>
> <artifactId>guava</artifactId>
> <groupId>com.google.guava</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <groupId>junit</groupId>
> <artifactId>junit</artifactId>
> <version>4.11</version>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-core</artifactId>
> <version>${hadoop.version}</version>
> <scope>provided</scope>
> <exclusions>
> <exclusion>
> <artifactId>jets3t</artifactId>
> <groupId>net.java.dev.jets3t</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <artifactId>jets3t</artifactId>
> <groupId>net.java.dev.jets3t</groupId>
> <version>0.9.2</version>
> </dependency>
> <dependency>
> <groupId>org.apache.hadoop</groupId>
> <artifactId>hadoop-client</artifactId>
> <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
> <scope>provided</scope>
> <exclusions>
> <exclusion>
> <artifactId>guava</artifactId>
> <groupId>com.google.guava</groupId>
> </exclusion>
> <exclusion>
> <artifactId>avro</artifactId>
> <groupId>org.apache.avro</groupId>
> </exclusion>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> </exclusions>
> </dependency>
> <dependency>
> <artifactId>avro-mapred</artifactId>
> <groupId>org.apache.avro</groupId>
> <version>1.7.7</version>
> </dependency>
> <dependency>
> <groupId>org.apache.mrunit</groupId>
> <artifactId>mrunit</artifactId>
> <version>1.0.0</version>
> <classifier>hadoop1</classifier>
> <scope>test</scope>
> </dependency>
> <dependency>
> <groupId>org.json</groupId>
> <artifactId>org.json</artifactId>
> <version>chargebee-1.0</version>
> </dependency>
> <dependency>
> <groupId>com.hadoop.gplcompression</groupId>
> <artifactId>hadoop-lzo</artifactId>
> <version>0.4.19</version>
> </dependency>
> <dependency>
> <groupId>net.sf.uadetector</groupId>
> <artifactId>uadetector-resources</artifactId>
> <version>2014.04</version>
> <exclusions>
> <exclusion>
> <artifactId>slf4j-api</artifactId>
> <groupId>org.slf4j</groupId>
> </exclusion>
> </exclusions>
> </dependency>
>
> <dependency>
> <groupId>commons-io</groupId>
> <artifactId>commons-io</artifactId>
> <version>2.4</version>
> </dependency>
>
> <dependency>
> <groupId>org.codehaus.jackson</groupId>
> <artifactId>jackson-mapper-asl</artifactId>
> <version>1.9.13</version>
> </dependency>
> <dependency>
> <groupId>org.codehaus.jackson</groupId>
> <artifactId>jackson-core-asl</artifactId>
> <version>1.9.13</version>
> </dependency>
>
> </dependencies>
>
>
>
> On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Share your avro dependencies(versions) in case your using maven and
>> hadoop dependencies (version)
>>
>>
>>
>> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Check your Hadoop version. In older version JobContext was interface and
>>> in new one its class.
>>>
>>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <
>>> davidginzburg@gmail.com> wrote:
>>>
>>>> Thank you for the answer.
>>>>
>>>> Tried but the exception
>>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>>> class was expected*
>>>> persists
>>>>
>>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> try this
>>>>>
>>>>> Job job = Job.getInstance(conf);
>>>>> Job.setName(name);
>>>>>
>>>>> Artem Ervits
>>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I get the following exception when running with AMI with hadoop 2.2
>>>>>> 2.5
>>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>>> expected
>>>>>>
>>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>>
>>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>> at
>>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>> at
>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>> at
>>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>> at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>>
>>>>>>
>>>>>> The driver code is
>>>>>>
>>>>>> Job job = new Job(getConf(), "myad");
>>>>>>
>>>>>>
>>>>>>
>>>>>> job.setOutputValueClass(NullWritable.class);
>>>>>>
>>>>>>
>>>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>>>> Path inputOflineFiles = new Path(args[0]);
>>>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>>>
>>>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>>
>>>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>>>> job.setMapOutputKeyClass(Text.class);
>>>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>>>
>>>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>> AvroJob.setOutputKeySchema(job,
>>>>>> Schema.create(Schema.Type.STRING));
>>>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>>
>>>>>>
>>>>>> job.setReducerClass(myAdReducer.class);
>>>>>> job.setOutputKeyClass(Text.class);
>>>>>> job.setOutputValueClass(UberRecord.class);
>>>>>> job.setNumReduceTasks(2);
>>>>>> String baseoutputFolder = args[2];
>>>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>> baseoutputFolder);
>>>>>> ;
>>>>>>
>>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>>
>>>>>> FileOutputFormat.setOutputPath(job, new
>>>>>> Path(baseoutputFolder));
>>>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>>>
>>>>>>
>>>>>> the mapper and reducers
>>>>>> @Override
>>>>>> public void setup(Context ctx) {
>>>>>>
>>>>>> ubp = new UberRecordProcessor();
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> protected void map(LongWritable key, Text value, Context context)
>>>>>> throws IOException, InterruptedException {
>>>>>> try {
>>>>>> handleLineinMap(value);
>>>>>> if(ub!=null){
>>>>>> context.write(new Text(ub.getAuctionId().toString()),
>>>>>> ub);
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapper").increment(1);
>>>>>> }else{
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>> }
>>>>>> } catch (Exception e) {
>>>>>> context.getCounter("myAd",
>>>>>> "myAdTextLineMapperError").increment(1);
>>>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>>>
>>>>>>
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> public class myAdReducer extends
>>>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>>> AvroValue<UberRecord>> {
>>>>>>
>>>>>> private static Logger logger =
>>>>>> Logger.getLogger(myAdReducer.class);
>>>>>> public static final String BASE_OUTPUT_FOLDER =
>>>>>> "base.output.folder";
>>>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>>> outputs;
>>>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>>>> private String baseOutputPath;
>>>>>> private long reduceAttemptUniqueIdentifier =
>>>>>> System.currentTimeMillis();
>>>>>>
>>>>>> // 2015-02-01T18:00:25.673Z
>>>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>>
>>>>>> @Override
>>>>>> protected void setup(Context context) throws IOException,
>>>>>> InterruptedException {
>>>>>>
>>>>>> amos = new AvroMultipleOutputs(context);
>>>>>> baseOutputPath =
>>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> @Override
>>>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>>>> Context context)
>>>>>> throws IOException, InterruptedException {
>>>>>>
>>>>>> try {
>>>>>> UberRecord ub = new UberRecord();
>>>>>> for (UberRecord ubi : values) {
>>>>>> // enrich
>>>>>> if (ubi.getExchange() == null) {
>>>>>> continue;
>>>>>> }
>>>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>> .getEnricher(ubi.getExchange().toString());
>>>>>> enc.enrich(ubi);
>>>>>> ub = mergeUB(ub, ubi);
>>>>>> }
>>>>>> logger.info("Writing UberRecord [" + ub.toString() +
>>>>>> "]");
>>>>>> String partition = getPartition(ub);
>>>>>>
>>>>>> // context.write();
>>>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>> new AvroValue<UberRecord>(ub), baseOutputPath +
>>>>>> "/"
>>>>>> + partition + "/p" +
>>>>>> reduceAttemptUniqueIdentifier);
>>>>>> } catch (Exception e) {
>>>>>> // TODO Auto-generated catch block
>>>>>> e.printStackTrace();
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>> List<Field> engFields =
>>>>>> EngageData.getClassSchema().getFields();
>>>>>> for (Field field : fields) {
>>>>>> if (field.name().equals("engageData")
>>>>>> && dest.getEngageData() != null) {
>>>>>> EngageData mergedEng = dest.getEngageData();
>>>>>> for (Field engField : engFields) {
>>>>>> if (dest.getEngageData().get(engField.name()) ==
>>>>>> null) {
>>>>>> mergedEng.put(engField.name(),
>>>>>>
>>>>>> src.getEngageData().get(engField.name()));
>>>>>> }
>>>>>>
>>>>>> }
>>>>>> dest.setEngageData(mergedEng);
>>>>>> } else {
>>>>>> if (dest.get(field.name()) == null) {
>>>>>> dest.put(field.name(), src.get(field.name()));
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>> return dest;
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
Re: Avromultiple output
Posted by David Ginzburg <da...@gmail.com>.
I am running with EMR
AMI version:3.3.1
Hadoop distribution:Amazon 2.4.0
The hadoop jars are provided
<hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2-beta</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.1.0</version>
<classifier>hadoop2</classifier>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>avro-mapred</artifactId>
<groupId>org.apache.avro</groupId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>1.0.0</version>
<classifier>hadoop1</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>org.json</artifactId>
<version>chargebee-1.0</version>
</dependency>
<dependency>
<groupId>com.hadoop.gplcompression</groupId>
<artifactId>hadoop-lzo</artifactId>
<version>0.4.19</version>
</dependency>
<dependency>
<groupId>net.sf.uadetector</groupId>
<artifactId>uadetector-resources</artifactId>
<version>2014.04</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
</dependencies>
On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> Share your avro dependencies(versions) in case your using maven and hadoop
> dependencies (version)
>
>
>
> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Check your Hadoop version. In older version JobContext was interface and
>> in new one its class.
>>
>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <davidginzburg@gmail.com
>> > wrote:
>>
>>> Thank you for the answer.
>>>
>>> Tried but the exception
>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>> class was expected*
>>> persists
>>>
>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>> wrote:
>>>
>>>> try this
>>>>
>>>> Job job = Job.getInstance(conf);
>>>> Job.setName(name);
>>>>
>>>> Artem Ervits
>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>> expected
>>>>>
>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>
>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>
>>>>> java.lang.NullPointerException
>>>>> at
>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>> at
>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>> at
>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>> at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>
>>>>>
>>>>> The driver code is
>>>>>
>>>>> Job job = new Job(getConf(), "myad");
>>>>>
>>>>>
>>>>>
>>>>> job.setOutputValueClass(NullWritable.class);
>>>>>
>>>>>
>>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>>> Path inputOflineFiles = new Path(args[0]);
>>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>>
>>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>
>>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>>> job.setMapOutputKeyClass(Text.class);
>>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>>
>>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>> AvroJob.setOutputKeySchema(job,
>>>>> Schema.create(Schema.Type.STRING));
>>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>
>>>>>
>>>>> job.setReducerClass(myAdReducer.class);
>>>>> job.setOutputKeyClass(Text.class);
>>>>> job.setOutputValueClass(UberRecord.class);
>>>>> job.setNumReduceTasks(2);
>>>>> String baseoutputFolder = args[2];
>>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>> baseoutputFolder);
>>>>> ;
>>>>>
>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>
>>>>> FileOutputFormat.setOutputPath(job, new
>>>>> Path(baseoutputFolder));
>>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>>
>>>>>
>>>>> the mapper and reducers
>>>>> @Override
>>>>> public void setup(Context ctx) {
>>>>>
>>>>> ubp = new UberRecordProcessor();
>>>>> }
>>>>>
>>>>> @Override
>>>>> protected void map(LongWritable key, Text value, Context context)
>>>>> throws IOException, InterruptedException {
>>>>> try {
>>>>> handleLineinMap(value);
>>>>> if(ub!=null){
>>>>> context.write(new Text(ub.getAuctionId().toString()),
>>>>> ub);
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapper").increment(1);
>>>>> }else{
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapperNull").increment(1);
>>>>> }
>>>>> } catch (Exception e) {
>>>>> context.getCounter("myAd",
>>>>> "myAdTextLineMapperError").increment(1);
>>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>>
>>>>>
>>>>> }
>>>>> }
>>>>>
>>>>> public class myAdReducer extends
>>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>> AvroValue<UberRecord>> {
>>>>>
>>>>> private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>>> public static final String BASE_OUTPUT_FOLDER =
>>>>> "base.output.folder";
>>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>> outputs;
>>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>>> private String baseOutputPath;
>>>>> private long reduceAttemptUniqueIdentifier =
>>>>> System.currentTimeMillis();
>>>>>
>>>>> // 2015-02-01T18:00:25.673Z
>>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>
>>>>> @Override
>>>>> protected void setup(Context context) throws IOException,
>>>>> InterruptedException {
>>>>>
>>>>> amos = new AvroMultipleOutputs(context);
>>>>> baseOutputPath =
>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>
>>>>> }
>>>>>
>>>>> @Override
>>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>>> Context context)
>>>>> throws IOException, InterruptedException {
>>>>>
>>>>> try {
>>>>> UberRecord ub = new UberRecord();
>>>>> for (UberRecord ubi : values) {
>>>>> // enrich
>>>>> if (ubi.getExchange() == null) {
>>>>> continue;
>>>>> }
>>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>> .getEnricher(ubi.getExchange().toString());
>>>>> enc.enrich(ubi);
>>>>> ub = mergeUB(ub, ubi);
>>>>> }
>>>>> logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>>> String partition = getPartition(ub);
>>>>>
>>>>> // context.write();
>>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>>> + partition + "/p" +
>>>>> reduceAttemptUniqueIdentifier);
>>>>> } catch (Exception e) {
>>>>> // TODO Auto-generated catch block
>>>>> e.printStackTrace();
>>>>> }
>>>>> }
>>>>>
>>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>> List<Field> engFields =
>>>>> EngageData.getClassSchema().getFields();
>>>>> for (Field field : fields) {
>>>>> if (field.name().equals("engageData")
>>>>> && dest.getEngageData() != null) {
>>>>> EngageData mergedEng = dest.getEngageData();
>>>>> for (Field engField : engFields) {
>>>>> if (dest.getEngageData().get(engField.name()) ==
>>>>> null) {
>>>>> mergedEng.put(engField.name(),
>>>>>
>>>>> src.getEngageData().get(engField.name()));
>>>>> }
>>>>>
>>>>> }
>>>>> dest.setEngageData(mergedEng);
>>>>> } else {
>>>>> if (dest.get(field.name()) == null) {
>>>>> dest.put(field.name(), src.get(field.name()));
>>>>> }
>>>>> }
>>>>> }
>>>>> return dest;
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>
Re: Avromultiple output
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Share your avro dependencies(versions) in case your using maven and hadoop
dependencies (version)
On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:
> Check your Hadoop version. In older version JobContext was interface and
> in new one its class.
>
> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <da...@gmail.com>
> wrote:
>
>> Thank you for the answer.
>>
>> Tried but the exception
>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>> class was expected*
>> persists
>>
>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>> wrote:
>>
>>> try this
>>>
>>> Job job = Job.getInstance(conf);
>>> Job.setName(name);
>>>
>>> Artem Ervits
>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>
>>>>
>>>>
>>>>
>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>> expected
>>>>
>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>
>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>> at
>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>> at
>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>> at java.security.AccessController.doPrivileged(Native Method)
>>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>>> at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>
>>>>
>>>> The driver code is
>>>>
>>>> Job job = new Job(getConf(), "myad");
>>>>
>>>>
>>>>
>>>> job.setOutputValueClass(NullWritable.class);
>>>>
>>>>
>>>> job.setJarByClass(myAdTextLineMapper.class);
>>>> Path inputOflineFiles = new Path(args[0]);
>>>> Path inputOfUbberFiles = new Path(args[1]);
>>>>
>>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>
>>>> job.setMapperClass(myAdTextLineMapper.class);
>>>> job.setMapOutputKeyClass(Text.class);
>>>> job.setMapOutputValueClass(UberRecord.class);
>>>>
>>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>> AvroJob.setOutputKeySchema(job,
>>>> Schema.create(Schema.Type.STRING));
>>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>
>>>>
>>>> job.setReducerClass(myAdReducer.class);
>>>> job.setOutputKeyClass(Text.class);
>>>> job.setOutputValueClass(UberRecord.class);
>>>> job.setNumReduceTasks(2);
>>>> String baseoutputFolder = args[2];
>>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>> baseoutputFolder);
>>>> ;
>>>>
>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>
>>>> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>>> return job.waitForCompletion(true) ? 0 : 1;
>>>>
>>>>
>>>> the mapper and reducers
>>>> @Override
>>>> public void setup(Context ctx) {
>>>>
>>>> ubp = new UberRecordProcessor();
>>>> }
>>>>
>>>> @Override
>>>> protected void map(LongWritable key, Text value, Context context)
>>>> throws IOException, InterruptedException {
>>>> try {
>>>> handleLineinMap(value);
>>>> if(ub!=null){
>>>> context.write(new Text(ub.getAuctionId().toString()),
>>>> ub);
>>>> context.getCounter("myAd",
>>>> "myAdTextLineMapper").increment(1);
>>>> }else{
>>>> context.getCounter("myAd",
>>>> "myAdTextLineMapperNull").increment(1);
>>>> }
>>>> } catch (Exception e) {
>>>> context.getCounter("myAd",
>>>> "myAdTextLineMapperError").increment(1);
>>>> logger.warn("could not parse line "+value.toString(),e);
>>>>
>>>>
>>>> }
>>>> }
>>>>
>>>> public class myAdReducer extends
>>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>> AvroValue<UberRecord>> {
>>>>
>>>> private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>> public static final String BASE_OUTPUT_FOLDER =
>>>> "base.output.folder";
>>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>>> // "year=%s/month=%s/day=%s/hour=%s"
>>>> private String baseOutputPath;
>>>> private long reduceAttemptUniqueIdentifier =
>>>> System.currentTimeMillis();
>>>>
>>>> // 2015-02-01T18:00:25.673Z
>>>> static DateTimeFormatter dateformat = DateTimeFormat
>>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>
>>>> @Override
>>>> protected void setup(Context context) throws IOException,
>>>> InterruptedException {
>>>>
>>>> amos = new AvroMultipleOutputs(context);
>>>> baseOutputPath =
>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>
>>>> }
>>>>
>>>> @Override
>>>> protected void reduce(Text key, Iterable<UberRecord> values,
>>>> Context context)
>>>> throws IOException, InterruptedException {
>>>>
>>>> try {
>>>> UberRecord ub = new UberRecord();
>>>> for (UberRecord ubi : values) {
>>>> // enrich
>>>> if (ubi.getExchange() == null) {
>>>> continue;
>>>> }
>>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>> .getEnricher(ubi.getExchange().toString());
>>>> enc.enrich(ubi);
>>>> ub = mergeUB(ub, ubi);
>>>> }
>>>> logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>> String partition = getPartition(ub);
>>>>
>>>> // context.write();
>>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>>> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>> + partition + "/p" +
>>>> reduceAttemptUniqueIdentifier);
>>>> } catch (Exception e) {
>>>> // TODO Auto-generated catch block
>>>> e.printStackTrace();
>>>> }
>>>> }
>>>>
>>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>>> List<Field> engFields = EngageData.getClassSchema().getFields();
>>>> for (Field field : fields) {
>>>> if (field.name().equals("engageData")
>>>> && dest.getEngageData() != null) {
>>>> EngageData mergedEng = dest.getEngageData();
>>>> for (Field engField : engFields) {
>>>> if (dest.getEngageData().get(engField.name()) ==
>>>> null) {
>>>> mergedEng.put(engField.name(),
>>>>
>>>> src.getEngageData().get(engField.name()));
>>>> }
>>>>
>>>> }
>>>> dest.setEngageData(mergedEng);
>>>> } else {
>>>> if (dest.get(field.name()) == null) {
>>>> dest.put(field.name(), src.get(field.name()));
>>>> }
>>>> }
>>>> }
>>>> return dest;
>>>> }
>>>>
>>>>
>>>>
>>>>
>>
>
>
> --
> Deepak
>
>
--
Deepak
Re: Avromultiple output
Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>,
de...@gmail.com.
Check your Hadoop version. In older version JobContext was interface and in
new one its class.
On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <da...@gmail.com>
wrote:
> Thank you for the answer.
>
> Tried but the exception
> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class
> was expected*
> persists
>
> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
> wrote:
>
>> try this
>>
>> Job job = Job.getInstance(conf);
>> Job.setName(name);
>>
>> Artem Ervits
>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>
>>>
>>>
>>>
>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>> expected
>>>
>>> I read it is related to incompatible hadoop versions, So I modified
>>>
>>> When running with AMI with hadoop 103 I get the following exception:
>>>
>>> java.lang.NullPointerException
>>> at
>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>> at
>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>> at
>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:415)
>>> at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>
>>>
>>> The driver code is
>>>
>>> Job job = new Job(getConf(), "myad");
>>>
>>>
>>>
>>> job.setOutputValueClass(NullWritable.class);
>>>
>>>
>>> job.setJarByClass(myAdTextLineMapper.class);
>>> Path inputOflineFiles = new Path(args[0]);
>>> Path inputOfUbberFiles = new Path(args[1]);
>>>
>>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>
>>> job.setMapperClass(myAdTextLineMapper.class);
>>> job.setMapOutputKeyClass(Text.class);
>>> job.setMapOutputValueClass(UberRecord.class);
>>>
>>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>> AvroJob.setOutputKeySchema(job,
>>> Schema.create(Schema.Type.STRING));
>>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>
>>>
>>> job.setReducerClass(myAdReducer.class);
>>> job.setOutputKeyClass(Text.class);
>>> job.setOutputValueClass(UberRecord.class);
>>> job.setNumReduceTasks(2);
>>> String baseoutputFolder = args[2];
>>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>> baseoutputFolder);
>>> ;
>>>
>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>
>>> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>> return job.waitForCompletion(true) ? 0 : 1;
>>>
>>>
>>> the mapper and reducers
>>> @Override
>>> public void setup(Context ctx) {
>>>
>>> ubp = new UberRecordProcessor();
>>> }
>>>
>>> @Override
>>> protected void map(LongWritable key, Text value, Context context)
>>> throws IOException, InterruptedException {
>>> try {
>>> handleLineinMap(value);
>>> if(ub!=null){
>>> context.write(new Text(ub.getAuctionId().toString()),
>>> ub);
>>> context.getCounter("myAd",
>>> "myAdTextLineMapper").increment(1);
>>> }else{
>>> context.getCounter("myAd",
>>> "myAdTextLineMapperNull").increment(1);
>>> }
>>> } catch (Exception e) {
>>> context.getCounter("myAd",
>>> "myAdTextLineMapperError").increment(1);
>>> logger.warn("could not parse line "+value.toString(),e);
>>>
>>>
>>> }
>>> }
>>>
>>> public class myAdReducer extends
>>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>> AvroValue<UberRecord>> {
>>>
>>> private static Logger logger = Logger.getLogger(myAdReducer.class);
>>> public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>> UberRecordProcessor ubp = new UberRecordProcessor();
>>> // "year=%s/month=%s/day=%s/hour=%s"
>>> private String baseOutputPath;
>>> private long reduceAttemptUniqueIdentifier =
>>> System.currentTimeMillis();
>>>
>>> // 2015-02-01T18:00:25.673Z
>>> static DateTimeFormatter dateformat = DateTimeFormat
>>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>
>>> @Override
>>> protected void setup(Context context) throws IOException,
>>> InterruptedException {
>>>
>>> amos = new AvroMultipleOutputs(context);
>>> baseOutputPath =
>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>
>>> }
>>>
>>> @Override
>>> protected void reduce(Text key, Iterable<UberRecord> values, Context
>>> context)
>>> throws IOException, InterruptedException {
>>>
>>> try {
>>> UberRecord ub = new UberRecord();
>>> for (UberRecord ubi : values) {
>>> // enrich
>>> if (ubi.getExchange() == null) {
>>> continue;
>>> }
>>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>> .getEnricher(ubi.getExchange().toString());
>>> enc.enrich(ubi);
>>> ub = mergeUB(ub, ubi);
>>> }
>>> logger.info("Writing UberRecord [" + ub.toString() + "]");
>>> String partition = getPartition(ub);
>>>
>>> // context.write();
>>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>> amos.write(new AvroKey<CharSequence>(key.toString()),
>>> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>> + partition + "/p" +
>>> reduceAttemptUniqueIdentifier);
>>> } catch (Exception e) {
>>> // TODO Auto-generated catch block
>>> e.printStackTrace();
>>> }
>>> }
>>>
>>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>> List<Field> fields = UberRecord.getClassSchema().getFields();
>>> List<Field> engFields = EngageData.getClassSchema().getFields();
>>> for (Field field : fields) {
>>> if (field.name().equals("engageData")
>>> && dest.getEngageData() != null) {
>>> EngageData mergedEng = dest.getEngageData();
>>> for (Field engField : engFields) {
>>> if (dest.getEngageData().get(engField.name()) ==
>>> null) {
>>> mergedEng.put(engField.name(),
>>>
>>> src.getEngageData().get(engField.name()));
>>> }
>>>
>>> }
>>> dest.setEngageData(mergedEng);
>>> } else {
>>> if (dest.get(field.name()) == null) {
>>> dest.put(field.name(), src.get(field.name()));
>>> }
>>> }
>>> }
>>> return dest;
>>> }
>>>
>>>
>>>
>>>
>
--
Deepak
Re: Avromultiple output
Posted by David Ginzburg <da...@gmail.com>.
Thank you for the answer.
Tried but the exception
* Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class
was expected*
persists
On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com> wrote:
> try this
>
> Job job = Job.getInstance(conf);
> Job.setName(name);
>
> Artem Ervits
> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to run an MR job on emr with AvromultipleOutput
>>
>>
>>
>>
>> I get the following exception when running with AMI with hadoop 2.2 2.5
>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>> expected
>>
>> I read it is related to incompatible hadoop versions, So I modified
>>
>> When running with AMI with hadoop 103 I get the following exception:
>>
>> java.lang.NullPointerException
>> at
>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>> at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>> at
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:415)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>
>>
>> The driver code is
>>
>> Job job = new Job(getConf(), "myad");
>>
>>
>>
>> job.setOutputValueClass(NullWritable.class);
>>
>>
>> job.setJarByClass(myAdTextLineMapper.class);
>> Path inputOflineFiles = new Path(args[0]);
>> Path inputOfUbberFiles = new Path(args[1]);
>>
>> FileInputFormat.setInputPaths(job, inputOflineFiles);
>>
>> job.setMapperClass(myAdTextLineMapper.class);
>> job.setMapOutputKeyClass(Text.class);
>> job.setMapOutputValueClass(UberRecord.class);
>>
>> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>> AvroJob.setOutputKeySchema(job,
>> Schema.create(Schema.Type.STRING));
>> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>
>>
>> job.setReducerClass(myAdReducer.class);
>> job.setOutputKeyClass(Text.class);
>> job.setOutputValueClass(UberRecord.class);
>> job.setNumReduceTasks(2);
>> String baseoutputFolder = args[2];
>> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>> baseoutputFolder);
>> ;
>>
>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>
>> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>> return job.waitForCompletion(true) ? 0 : 1;
>>
>>
>> the mapper and reducers
>> @Override
>> public void setup(Context ctx) {
>>
>> ubp = new UberRecordProcessor();
>> }
>>
>> @Override
>> protected void map(LongWritable key, Text value, Context context)
>> throws IOException, InterruptedException {
>> try {
>> handleLineinMap(value);
>> if(ub!=null){
>> context.write(new Text(ub.getAuctionId().toString()), ub);
>> context.getCounter("myAd",
>> "myAdTextLineMapper").increment(1);
>> }else{
>> context.getCounter("myAd",
>> "myAdTextLineMapperNull").increment(1);
>> }
>> } catch (Exception e) {
>> context.getCounter("myAd",
>> "myAdTextLineMapperError").increment(1);
>> logger.warn("could not parse line "+value.toString(),e);
>>
>>
>> }
>> }
>>
>> public class myAdReducer extends
>> Reducer<Text, UberRecord, AvroKey<CharSequence>,
>> AvroValue<UberRecord>> {
>>
>> private static Logger logger = Logger.getLogger(myAdReducer.class);
>> public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>> UberRecordProcessor ubp = new UberRecordProcessor();
>> // "year=%s/month=%s/day=%s/hour=%s"
>> private String baseOutputPath;
>> private long reduceAttemptUniqueIdentifier =
>> System.currentTimeMillis();
>>
>> // 2015-02-01T18:00:25.673Z
>> static DateTimeFormatter dateformat = DateTimeFormat
>> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>
>> @Override
>> protected void setup(Context context) throws IOException,
>> InterruptedException {
>>
>> amos = new AvroMultipleOutputs(context);
>> baseOutputPath =
>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>
>> }
>>
>> @Override
>> protected void reduce(Text key, Iterable<UberRecord> values, Context
>> context)
>> throws IOException, InterruptedException {
>>
>> try {
>> UberRecord ub = new UberRecord();
>> for (UberRecord ubi : values) {
>> // enrich
>> if (ubi.getExchange() == null) {
>> continue;
>> }
>> BaseBidRequestEnricher enc = BaseBidRequestEnricher
>> .getEnricher(ubi.getExchange().toString());
>> enc.enrich(ubi);
>> ub = mergeUB(ub, ubi);
>> }
>> logger.info("Writing UberRecord [" + ub.toString() + "]");
>> String partition = getPartition(ub);
>>
>> // context.write();
>> // AvroKey<CharSequence>, AvroValue<UberRecord>>
>> amos.write(new AvroKey<CharSequence>(key.toString()),
>> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>> + partition + "/p" +
>> reduceAttemptUniqueIdentifier);
>> } catch (Exception e) {
>> // TODO Auto-generated catch block
>> e.printStackTrace();
>> }
>> }
>>
>> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>> List<Field> fields = UberRecord.getClassSchema().getFields();
>> List<Field> engFields = EngageData.getClassSchema().getFields();
>> for (Field field : fields) {
>> if (field.name().equals("engageData")
>> && dest.getEngageData() != null) {
>> EngageData mergedEng = dest.getEngageData();
>> for (Field engField : engFields) {
>> if (dest.getEngageData().get(engField.name()) ==
>> null) {
>> mergedEng.put(engField.name(),
>> src.getEngageData().get(engField.name()));
>> }
>>
>> }
>> dest.setEngageData(mergedEng);
>> } else {
>> if (dest.get(field.name()) == null) {
>> dest.put(field.name(), src.get(field.name()));
>> }
>> }
>> }
>> return dest;
>> }
>>
>>
>>
>>
Re: Avromultiple output
Posted by Artem Ervits <ar...@gmail.com>.
try this
Job job = Job.getInstance(conf);
Job.setName(name);
Artem Ervits
On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com> wrote:
> Hi,
> I am trying to run an MR job on emr with AvromultipleOutput
>
>
>
>
> I get the following exception when running with AMI with hadoop 2.2 2.5
> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
> expected
>
> I read it is related to incompatible hadoop versions, So I modified
>
> When running with AMI with hadoop 103 I get the following exception:
>
> java.lang.NullPointerException
> at
> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
> at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
> at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
> at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
> at org.apache.hadoop.mapred.Child.main(Child.java:253)
>
>
> The driver code is
>
> Job job = new Job(getConf(), "myad");
>
>
>
> job.setOutputValueClass(NullWritable.class);
>
>
> job.setJarByClass(myAdTextLineMapper.class);
> Path inputOflineFiles = new Path(args[0]);
> Path inputOfUbberFiles = new Path(args[1]);
>
> FileInputFormat.setInputPaths(job, inputOflineFiles);
>
> job.setMapperClass(myAdTextLineMapper.class);
> job.setMapOutputKeyClass(Text.class);
> job.setMapOutputValueClass(UberRecord.class);
>
> job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
> AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
> AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>
>
> job.setReducerClass(myAdReducer.class);
> job.setOutputKeyClass(Text.class);
> job.setOutputValueClass(UberRecord.class);
> job.setNumReduceTasks(2);
> String baseoutputFolder = args[2];
> job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
> baseoutputFolder);
> ;
>
> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>
> FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
> return job.waitForCompletion(true) ? 0 : 1;
>
>
> the mapper and reducers
> @Override
> public void setup(Context ctx) {
>
> ubp = new UberRecordProcessor();
> }
>
> @Override
> protected void map(LongWritable key, Text value, Context context)
> throws IOException, InterruptedException {
> try {
> handleLineinMap(value);
> if(ub!=null){
> context.write(new Text(ub.getAuctionId().toString()), ub);
> context.getCounter("myAd",
> "myAdTextLineMapper").increment(1);
> }else{
> context.getCounter("myAd",
> "myAdTextLineMapperNull").increment(1);
> }
> } catch (Exception e) {
> context.getCounter("myAd",
> "myAdTextLineMapperError").increment(1);
> logger.warn("could not parse line "+value.toString(),e);
>
>
> }
> }
>
> public class myAdReducer extends
> Reducer<Text, UberRecord, AvroKey<CharSequence>,
> AvroValue<UberRecord>> {
>
> private static Logger logger = Logger.getLogger(myAdReducer.class);
> public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
> AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
> UberRecordProcessor ubp = new UberRecordProcessor();
> // "year=%s/month=%s/day=%s/hour=%s"
> private String baseOutputPath;
> private long reduceAttemptUniqueIdentifier =
> System.currentTimeMillis();
>
> // 2015-02-01T18:00:25.673Z
> static DateTimeFormatter dateformat = DateTimeFormat
> .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>
> @Override
> protected void setup(Context context) throws IOException,
> InterruptedException {
>
> amos = new AvroMultipleOutputs(context);
> baseOutputPath =
> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>
> }
>
> @Override
> protected void reduce(Text key, Iterable<UberRecord> values, Context
> context)
> throws IOException, InterruptedException {
>
> try {
> UberRecord ub = new UberRecord();
> for (UberRecord ubi : values) {
> // enrich
> if (ubi.getExchange() == null) {
> continue;
> }
> BaseBidRequestEnricher enc = BaseBidRequestEnricher
> .getEnricher(ubi.getExchange().toString());
> enc.enrich(ubi);
> ub = mergeUB(ub, ubi);
> }
> logger.info("Writing UberRecord [" + ub.toString() + "]");
> String partition = getPartition(ub);
>
> // context.write();
> // AvroKey<CharSequence>, AvroValue<UberRecord>>
> amos.write(new AvroKey<CharSequence>(key.toString()),
> new AvroValue<UberRecord>(ub), baseOutputPath + "/"
> + partition + "/p" +
> reduceAttemptUniqueIdentifier);
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> public UberRecord mergeUB(UberRecord dest, UberRecord src) {
> List<Field> fields = UberRecord.getClassSchema().getFields();
> List<Field> engFields = EngageData.getClassSchema().getFields();
> for (Field field : fields) {
> if (field.name().equals("engageData")
> && dest.getEngageData() != null) {
> EngageData mergedEng = dest.getEngageData();
> for (Field engField : engFields) {
> if (dest.getEngageData().get(engField.name()) == null)
> {
> mergedEng.put(engField.name(),
> src.getEngageData().get(engField.name()));
> }
>
> }
> dest.setEngageData(mergedEng);
> } else {
> if (dest.get(field.name()) == null) {
> dest.put(field.name(), src.get(field.name()));
> }
> }
> }
> return dest;
> }
>
>
>
>