You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@avro.apache.org by David Ginzburg <da...@gmail.com> on 2015/02/22 04:55:27 UTC

Avromultiple output

Hi,
I am trying to run an MR job on emr with AvromultipleOutput




I get the following exception when running with AMI with hadoop 2.2 2.5
Found interface org.apache.hadoop.mapreduce.JobContext, but class was
expected

I read it is related to incompatible hadoop versions, So I modified

When running with AMI with hadoop 103 I get the following exception:

java.lang.NullPointerException
    at
org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
    at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
    at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)


The driver code is

Job job = new Job(getConf(), "myad");



        job.setOutputValueClass(NullWritable.class);


        job.setJarByClass(myAdTextLineMapper.class);
        Path inputOflineFiles = new Path(args[0]);
        Path inputOfUbberFiles = new Path(args[1]);

         FileInputFormat.setInputPaths(job, inputOflineFiles);

        job.setMapperClass(myAdTextLineMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(UberRecord.class);

        job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
        AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
        AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);


        job.setReducerClass(myAdReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(UberRecord.class);
        job.setNumReduceTasks(2);
        String baseoutputFolder = args[2];
        job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
                baseoutputFolder);
        ;

LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
        return job.waitForCompletion(true) ? 0 : 1;


the mapper and reducers
@Override
    public void setup(Context ctx) {

        ubp = new UberRecordProcessor();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        try {
            handleLineinMap(value);
            if(ub!=null){
                context.write(new Text(ub.getAuctionId().toString()), ub);
                context.getCounter("myAd",
"myAdTextLineMapper").increment(1);
            }else{
                context.getCounter("myAd",
"myAdTextLineMapperNull").increment(1);
            }
        } catch (Exception e) {
            context.getCounter("myAd",
"myAdTextLineMapperError").increment(1);
            logger.warn("could not parse line "+value.toString(),e);


        }
    }

public class myAdReducer extends
        Reducer<Text, UberRecord, AvroKey<CharSequence>,
AvroValue<UberRecord>> {

    private static Logger logger = Logger.getLogger(myAdReducer.class);
    public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
    AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
    UberRecordProcessor ubp = new UberRecordProcessor();
    // "year=%s/month=%s/day=%s/hour=%s"
    private String baseOutputPath;
    private long reduceAttemptUniqueIdentifier = System.currentTimeMillis();

    // 2015-02-01T18:00:25.673Z
    static DateTimeFormatter dateformat = DateTimeFormat
            .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {

        amos = new AvroMultipleOutputs(context);
        baseOutputPath = context.getConfiguration().get(BASE_OUTPUT_FOLDER);

    }

    @Override
    protected void reduce(Text key, Iterable<UberRecord> values, Context
context)
            throws IOException, InterruptedException {

        try {
            UberRecord ub = new UberRecord();
            for (UberRecord ubi : values) {
                // enrich
                if (ubi.getExchange() == null) {
                    continue;
                }
                BaseBidRequestEnricher enc = BaseBidRequestEnricher
                        .getEnricher(ubi.getExchange().toString());
                enc.enrich(ubi);
                ub = mergeUB(ub, ubi);
            }
            logger.info("Writing UberRecord [" + ub.toString() + "]");
            String partition = getPartition(ub);

            // context.write();
            // AvroKey<CharSequence>, AvroValue<UberRecord>>
            amos.write(new AvroKey<CharSequence>(key.toString()),
                    new AvroValue<UberRecord>(ub), baseOutputPath + "/"
                            + partition + "/p" +
reduceAttemptUniqueIdentifier);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public UberRecord mergeUB(UberRecord dest, UberRecord src) {
        List<Field> fields = UberRecord.getClassSchema().getFields();
        List<Field> engFields = EngageData.getClassSchema().getFields();
        for (Field field : fields) {
            if (field.name().equals("engageData")
                    && dest.getEngageData() != null) {
                EngageData mergedEng = dest.getEngageData();
                for (Field engField : engFields) {
                    if (dest.getEngageData().get(engField.name()) == null) {
                        mergedEng.put(engField.name(),
                                src.getEngageData().get(engField.name()));
                    }

                }
                dest.setEngageData(mergedEng);
            } else {
                if (dest.get(field.name()) == null) {
                    dest.put(field.name(), src.get(field.name()));
                }
            }
        }
        return dest;
    }

Re: Avromultiple output

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
Hadoop packaging changed from 1.x to 2.x. Ex: hadoop-core is no longer
valid in 2.x

Use these (hadoop1, hadoo2)

<profile>
<id>hadoop1</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.0.1.3.0.0-107</version>
<exclusions>
<exclusion>
<artifactId>javax.servlet</artifactId>
<groupId>servlet-api</groupId>
</exclusion>
<exclusion>
<artifactId>org.mortbay.jetty</artifactId>
<groupId>servlet-api</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.hadoop.compression</groupId>
<artifactId>hadoop-gpl-compression</artifactId>
<version>0.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.12.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
<classifier>hadoop1</classifier>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<profile>
<id>hadoop2</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.hadoop.compression.lzo</groupId>
<artifactId>hadoop-lzo</artifactId>
</dependency>
<dependency>
<groupId>org.apache.pig</groupId>
<artifactId>pig</artifactId>
<version>0.12.0.2.0.6.0-76</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
*<classifier>hadoop2</classifier>*
</dependency>
<dependencies>
*<dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-common</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
* <dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-hdfs</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
* <dependency>*
* <groupId>org.apache.hadoop</groupId>*
* <artifactId>hadoop-mapreduce-client-jobclient</artifactId>*
* <version>${hadoop.version}</version>*
* </dependency>*
</dependencies>
</dependencies>
</dependencyManagement>
</profile>

On Tue, Feb 24, 2015 at 11:46 PM, Artem Ervits <ar...@gmail.com>
wrote:

> Hadoop-core  is now hadoop-common
>
> Artem Ervits
> On Feb 24, 2015 12:57 PM, "David Ginzburg" <da...@gmail.com>
> wrote:
>
>> I am running with EMR
>> AMI version:3.3.1
>> Hadoop distribution:Amazon 2.4.0
>>
>> The hadoop jars are provided
>>
>>
>>  <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
>>
>> <dependencies>
>>
>>         <dependency>
>>             <groupId>com.google.guava</groupId>
>>             <artifactId>guava</artifactId>
>>             <version>18.0</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.10</artifactId>
>>             <version>0.8.2-beta</version>
>>             <scope>compile</scope>
>>             <exclusions>
>>                 <exclusion>
>>                     <groupId>com.sun.jmx</groupId>
>>                     <artifactId>jmxri</artifactId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <groupId>com.sun.jdmk</groupId>
>>                     <artifactId>jmxtools</artifactId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <groupId>javax.jms</groupId>
>>                     <artifactId>jms</artifactId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <artifactId>slf4j-api</artifactId>
>>                     <groupId>org.slf4j</groupId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <artifactId>snappy-java</artifactId>
>>                     <groupId>org.xerial.snappy</groupId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>         <dependency>
>>             <groupId>com.maxmind.geoip2</groupId>
>>             <artifactId>geoip2</artifactId>
>>             <version>2.1.0</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>com.amazonaws</groupId>
>>             <artifactId>aws-java-sdk</artifactId>
>>             <version>1.9.3</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>com.google.code.gson</groupId>
>>             <artifactId>gson</artifactId>
>>             <version>1.6</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>log4j</groupId>
>>             <artifactId>log4j</artifactId>
>>             <version>1.2.17</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>joda-time</groupId>
>>             <artifactId>joda-time</artifactId>
>>             <version>2.6</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.mockito</groupId>
>>             <artifactId>mockito-all</artifactId>
>>             <version>1.9.5</version>
>>             <scope>test</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.mrunit</groupId>
>>             <artifactId>mrunit</artifactId>
>>             <version>1.1.0</version>
>>             <classifier>hadoop2</classifier>
>>             <exclusions>
>>                 <exclusion>
>>                     <artifactId>guava</artifactId>
>>                     <groupId>com.google.guava</groupId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>         <dependency>
>>             <groupId>junit</groupId>
>>             <artifactId>junit</artifactId>
>>             <version>4.11</version>
>>             <scope>test</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.hadoop</groupId>
>>             <artifactId>hadoop-core</artifactId>
>>             <version>${hadoop.version}</version>
>>             <scope>provided</scope>
>>             <exclusions>
>>                 <exclusion>
>>                     <artifactId>jets3t</artifactId>
>>                     <groupId>net.java.dev.jets3t</groupId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>         <dependency>
>>             <artifactId>jets3t</artifactId>
>>             <groupId>net.java.dev.jets3t</groupId>
>>             <version>0.9.2</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.hadoop</groupId>
>>             <artifactId>hadoop-client</artifactId>
>>             <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1
>> -->
>>             <scope>provided</scope>
>>             <exclusions>
>>                 <exclusion>
>>                     <artifactId>guava</artifactId>
>>                     <groupId>com.google.guava</groupId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <artifactId>avro</artifactId>
>>                     <groupId>org.apache.avro</groupId>
>>                 </exclusion>
>>                 <exclusion>
>>                     <artifactId>slf4j-api</artifactId>
>>                     <groupId>org.slf4j</groupId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>         <dependency>
>>             <artifactId>avro-mapred</artifactId>
>>             <groupId>org.apache.avro</groupId>
>>             <version>1.7.7</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.mrunit</groupId>
>>             <artifactId>mrunit</artifactId>
>>             <version>1.0.0</version>
>>             <classifier>hadoop1</classifier>
>>             <scope>test</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.json</groupId>
>>             <artifactId>org.json</artifactId>
>>             <version>chargebee-1.0</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>com.hadoop.gplcompression</groupId>
>>             <artifactId>hadoop-lzo</artifactId>
>>             <version>0.4.19</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>net.sf.uadetector</groupId>
>>             <artifactId>uadetector-resources</artifactId>
>>             <version>2014.04</version>
>>             <exclusions>
>>                 <exclusion>
>>                     <artifactId>slf4j-api</artifactId>
>>                     <groupId>org.slf4j</groupId>
>>                 </exclusion>
>>             </exclusions>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>commons-io</groupId>
>>             <artifactId>commons-io</artifactId>
>>             <version>2.4</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.codehaus.jackson</groupId>
>>             <artifactId>jackson-mapper-asl</artifactId>
>>             <version>1.9.13</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.codehaus.jackson</groupId>
>>             <artifactId>jackson-core-asl</artifactId>
>>             <version>1.9.13</version>
>>         </dependency>
>>
>>     </dependencies>
>>
>>
>>
>> On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Share your avro dependencies(versions) in case your using maven and
>>> hadoop dependencies (version)
>>>
>>>
>>>
>>> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>>> wrote:
>>>
>>>> Check your Hadoop version. In older version JobContext was interface
>>>> and in new one its class.
>>>>
>>>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <
>>>> davidginzburg@gmail.com> wrote:
>>>>
>>>>> Thank you for the answer.
>>>>>
>>>>> Tried but the exception
>>>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>>>> class was expected*
>>>>> persists
>>>>>
>>>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> try this
>>>>>>
>>>>>> Job job = Job.getInstance(conf);
>>>>>> Job.setName(name);
>>>>>>
>>>>>> Artem Ervits
>>>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I get the following exception when running with AMI with hadoop 2.2
>>>>>>> 2.5
>>>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class
>>>>>>> was expected
>>>>>>>
>>>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>>>
>>>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>>>
>>>>>>> java.lang.NullPointerException
>>>>>>>     at
>>>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>>>     at
>>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>>>     at
>>>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>>>     at
>>>>>>> org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>>     at
>>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>>>
>>>>>>>
>>>>>>> The driver code is
>>>>>>>
>>>>>>> Job job = new Job(getConf(), "myad");
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>         job.setOutputValueClass(NullWritable.class);
>>>>>>>
>>>>>>>
>>>>>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>>>>>         Path inputOflineFiles = new Path(args[0]);
>>>>>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>>>>>
>>>>>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>>>
>>>>>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>>>>>         job.setMapOutputKeyClass(Text.class);
>>>>>>>         job.setMapOutputValueClass(UberRecord.class);
>>>>>>>
>>>>>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>>>         AvroJob.setOutputKeySchema(job,
>>>>>>> Schema.create(Schema.Type.STRING));
>>>>>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>>>
>>>>>>>
>>>>>>>         job.setReducerClass(myAdReducer.class);
>>>>>>>         job.setOutputKeyClass(Text.class);
>>>>>>>         job.setOutputValueClass(UberRecord.class);
>>>>>>>         job.setNumReduceTasks(2);
>>>>>>>         String baseoutputFolder = args[2];
>>>>>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>>>                 baseoutputFolder);
>>>>>>>         ;
>>>>>>>
>>>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>>>
>>>>>>>         FileOutputFormat.setOutputPath(job, new
>>>>>>> Path(baseoutputFolder));
>>>>>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>>>>>
>>>>>>>
>>>>>>> the mapper and reducers
>>>>>>> @Override
>>>>>>>     public void setup(Context ctx) {
>>>>>>>
>>>>>>>         ubp = new UberRecordProcessor();
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     protected void map(LongWritable key, Text value, Context context)
>>>>>>>             throws IOException, InterruptedException {
>>>>>>>         try {
>>>>>>>             handleLineinMap(value);
>>>>>>>             if(ub!=null){
>>>>>>>                 context.write(new
>>>>>>> Text(ub.getAuctionId().toString()), ub);
>>>>>>>                 context.getCounter("myAd",
>>>>>>> "myAdTextLineMapper").increment(1);
>>>>>>>             }else{
>>>>>>>                 context.getCounter("myAd",
>>>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>>>             }
>>>>>>>         } catch (Exception e) {
>>>>>>>             context.getCounter("myAd",
>>>>>>> "myAdTextLineMapperError").increment(1);
>>>>>>>             logger.warn("could not parse line "+value.toString(),e);
>>>>>>>
>>>>>>>
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>> public class myAdReducer extends
>>>>>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>>>> AvroValue<UberRecord>> {
>>>>>>>
>>>>>>>     private static Logger logger =
>>>>>>> Logger.getLogger(myAdReducer.class);
>>>>>>>     public static final String BASE_OUTPUT_FOLDER =
>>>>>>> "base.output.folder";
>>>>>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>>>> outputs;
>>>>>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>>>>>     private String baseOutputPath;
>>>>>>>     private long reduceAttemptUniqueIdentifier =
>>>>>>> System.currentTimeMillis();
>>>>>>>
>>>>>>>     // 2015-02-01T18:00:25.673Z
>>>>>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>>>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>>>
>>>>>>>     @Override
>>>>>>>     protected void setup(Context context) throws IOException,
>>>>>>>             InterruptedException {
>>>>>>>
>>>>>>>         amos = new AvroMultipleOutputs(context);
>>>>>>>         baseOutputPath =
>>>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>>>
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     protected void reduce(Text key, Iterable<UberRecord> values,
>>>>>>> Context context)
>>>>>>>             throws IOException, InterruptedException {
>>>>>>>
>>>>>>>         try {
>>>>>>>             UberRecord ub = new UberRecord();
>>>>>>>             for (UberRecord ubi : values) {
>>>>>>>                 // enrich
>>>>>>>                 if (ubi.getExchange() == null) {
>>>>>>>                     continue;
>>>>>>>                 }
>>>>>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>>>                         .getEnricher(ubi.getExchange().toString());
>>>>>>>                 enc.enrich(ubi);
>>>>>>>                 ub = mergeUB(ub, ubi);
>>>>>>>             }
>>>>>>>             logger.info("Writing UberRecord [" + ub.toString() +
>>>>>>> "]");
>>>>>>>             String partition = getPartition(ub);
>>>>>>>
>>>>>>>             // context.write();
>>>>>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>>>                     new AvroValue<UberRecord>(ub), baseOutputPath +
>>>>>>> "/"
>>>>>>>                             + partition + "/p" +
>>>>>>> reduceAttemptUniqueIdentifier);
>>>>>>>         } catch (Exception e) {
>>>>>>>             // TODO Auto-generated catch block
>>>>>>>             e.printStackTrace();
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>>>         List<Field> engFields =
>>>>>>> EngageData.getClassSchema().getFields();
>>>>>>>         for (Field field : fields) {
>>>>>>>             if (field.name().equals("engageData")
>>>>>>>                     && dest.getEngageData() != null) {
>>>>>>>                 EngageData mergedEng = dest.getEngageData();
>>>>>>>                 for (Field engField : engFields) {
>>>>>>>                     if (dest.getEngageData().get(engField.name()) ==
>>>>>>> null) {
>>>>>>>                         mergedEng.put(engField.name(),
>>>>>>>
>>>>>>> src.getEngageData().get(engField.name()));
>>>>>>>                     }
>>>>>>>
>>>>>>>                 }
>>>>>>>                 dest.setEngageData(mergedEng);
>>>>>>>             } else {
>>>>>>>                 if (dest.get(field.name()) == null) {
>>>>>>>                     dest.put(field.name(), src.get(field.name()));
>>>>>>>                 }
>>>>>>>             }
>>>>>>>         }
>>>>>>>         return dest;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Deepak
>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>


-- 
Deepak

Re: Avromultiple output

Posted by Artem Ervits <ar...@gmail.com>.
Hadoop-core  is now hadoop-common

Artem Ervits
On Feb 24, 2015 12:57 PM, "David Ginzburg" <da...@gmail.com> wrote:

> I am running with EMR
> AMI version:3.3.1
> Hadoop distribution:Amazon 2.4.0
>
> The hadoop jars are provided
>
>
>  <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>
>
> <dependencies>
>
>         <dependency>
>             <groupId>com.google.guava</groupId>
>             <artifactId>guava</artifactId>
>             <version>18.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.2-beta</version>
>             <scope>compile</scope>
>             <exclusions>
>                 <exclusion>
>                     <groupId>com.sun.jmx</groupId>
>                     <artifactId>jmxri</artifactId>
>                 </exclusion>
>                 <exclusion>
>                     <groupId>com.sun.jdmk</groupId>
>                     <artifactId>jmxtools</artifactId>
>                 </exclusion>
>                 <exclusion>
>                     <groupId>javax.jms</groupId>
>                     <artifactId>jms</artifactId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>slf4j-api</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>snappy-java</artifactId>
>                     <groupId>org.xerial.snappy</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>com.maxmind.geoip2</groupId>
>             <artifactId>geoip2</artifactId>
>             <version>2.1.0</version>
>         </dependency>
>         <dependency>
>             <groupId>com.amazonaws</groupId>
>             <artifactId>aws-java-sdk</artifactId>
>             <version>1.9.3</version>
>         </dependency>
>         <dependency>
>             <groupId>com.google.code.gson</groupId>
>             <artifactId>gson</artifactId>
>             <version>1.6</version>
>         </dependency>
>         <dependency>
>             <groupId>log4j</groupId>
>             <artifactId>log4j</artifactId>
>             <version>1.2.17</version>
>         </dependency>
>         <dependency>
>             <groupId>joda-time</groupId>
>             <artifactId>joda-time</artifactId>
>             <version>2.6</version>
>         </dependency>
>         <dependency>
>             <groupId>org.mockito</groupId>
>             <artifactId>mockito-all</artifactId>
>             <version>1.9.5</version>
>             <scope>test</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.mrunit</groupId>
>             <artifactId>mrunit</artifactId>
>             <version>1.1.0</version>
>             <classifier>hadoop2</classifier>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>guava</artifactId>
>                     <groupId>com.google.guava</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>junit</groupId>
>             <artifactId>junit</artifactId>
>             <version>4.11</version>
>             <scope>test</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-core</artifactId>
>             <version>${hadoop.version}</version>
>             <scope>provided</scope>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>jets3t</artifactId>
>                     <groupId>net.java.dev.jets3t</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <artifactId>jets3t</artifactId>
>             <groupId>net.java.dev.jets3t</groupId>
>             <version>0.9.2</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.hadoop</groupId>
>             <artifactId>hadoop-client</artifactId>
>             <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
>             <scope>provided</scope>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>guava</artifactId>
>                     <groupId>com.google.guava</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>avro</artifactId>
>                     <groupId>org.apache.avro</groupId>
>                 </exclusion>
>                 <exclusion>
>                     <artifactId>slf4j-api</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <artifactId>avro-mapred</artifactId>
>             <groupId>org.apache.avro</groupId>
>             <version>1.7.7</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.mrunit</groupId>
>             <artifactId>mrunit</artifactId>
>             <version>1.0.0</version>
>             <classifier>hadoop1</classifier>
>             <scope>test</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.json</groupId>
>             <artifactId>org.json</artifactId>
>             <version>chargebee-1.0</version>
>         </dependency>
>         <dependency>
>             <groupId>com.hadoop.gplcompression</groupId>
>             <artifactId>hadoop-lzo</artifactId>
>             <version>0.4.19</version>
>         </dependency>
>         <dependency>
>             <groupId>net.sf.uadetector</groupId>
>             <artifactId>uadetector-resources</artifactId>
>             <version>2014.04</version>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>slf4j-api</artifactId>
>                     <groupId>org.slf4j</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>
>         <dependency>
>             <groupId>commons-io</groupId>
>             <artifactId>commons-io</artifactId>
>             <version>2.4</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.codehaus.jackson</groupId>
>             <artifactId>jackson-mapper-asl</artifactId>
>             <version>1.9.13</version>
>         </dependency>
>         <dependency>
>             <groupId>org.codehaus.jackson</groupId>
>             <artifactId>jackson-core-asl</artifactId>
>             <version>1.9.13</version>
>         </dependency>
>
>     </dependencies>
>
>
>
> On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Share your avro dependencies(versions) in case your using maven and
>> hadoop dependencies (version)
>>
>>
>>
>> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
>> wrote:
>>
>>> Check your Hadoop version. In older version JobContext was interface and
>>> in new one its class.
>>>
>>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <
>>> davidginzburg@gmail.com> wrote:
>>>
>>>> Thank you for the answer.
>>>>
>>>> Tried but the exception
>>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>>> class was expected*
>>>> persists
>>>>
>>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> try this
>>>>>
>>>>> Job job = Job.getInstance(conf);
>>>>> Job.setName(name);
>>>>>
>>>>> Artem Ervits
>>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> I get the following exception when running with AMI with hadoop 2.2
>>>>>> 2.5
>>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>>> expected
>>>>>>
>>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>>
>>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>>
>>>>>> java.lang.NullPointerException
>>>>>>     at
>>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>>     at
>>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>>     at
>>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>>     at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>>
>>>>>>
>>>>>> The driver code is
>>>>>>
>>>>>> Job job = new Job(getConf(), "myad");
>>>>>>
>>>>>>
>>>>>>
>>>>>>         job.setOutputValueClass(NullWritable.class);
>>>>>>
>>>>>>
>>>>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>>>>         Path inputOflineFiles = new Path(args[0]);
>>>>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>>>>
>>>>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>>
>>>>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>>>>         job.setMapOutputKeyClass(Text.class);
>>>>>>         job.setMapOutputValueClass(UberRecord.class);
>>>>>>
>>>>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>>         AvroJob.setOutputKeySchema(job,
>>>>>> Schema.create(Schema.Type.STRING));
>>>>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>>
>>>>>>
>>>>>>         job.setReducerClass(myAdReducer.class);
>>>>>>         job.setOutputKeyClass(Text.class);
>>>>>>         job.setOutputValueClass(UberRecord.class);
>>>>>>         job.setNumReduceTasks(2);
>>>>>>         String baseoutputFolder = args[2];
>>>>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>>                 baseoutputFolder);
>>>>>>         ;
>>>>>>
>>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>>
>>>>>>         FileOutputFormat.setOutputPath(job, new
>>>>>> Path(baseoutputFolder));
>>>>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>>>>
>>>>>>
>>>>>> the mapper and reducers
>>>>>> @Override
>>>>>>     public void setup(Context ctx) {
>>>>>>
>>>>>>         ubp = new UberRecordProcessor();
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     protected void map(LongWritable key, Text value, Context context)
>>>>>>             throws IOException, InterruptedException {
>>>>>>         try {
>>>>>>             handleLineinMap(value);
>>>>>>             if(ub!=null){
>>>>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>>>>> ub);
>>>>>>                 context.getCounter("myAd",
>>>>>> "myAdTextLineMapper").increment(1);
>>>>>>             }else{
>>>>>>                 context.getCounter("myAd",
>>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>>             }
>>>>>>         } catch (Exception e) {
>>>>>>             context.getCounter("myAd",
>>>>>> "myAdTextLineMapperError").increment(1);
>>>>>>             logger.warn("could not parse line "+value.toString(),e);
>>>>>>
>>>>>>
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>> public class myAdReducer extends
>>>>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>>> AvroValue<UberRecord>> {
>>>>>>
>>>>>>     private static Logger logger =
>>>>>> Logger.getLogger(myAdReducer.class);
>>>>>>     public static final String BASE_OUTPUT_FOLDER =
>>>>>> "base.output.folder";
>>>>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>>> outputs;
>>>>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>>>>     private String baseOutputPath;
>>>>>>     private long reduceAttemptUniqueIdentifier =
>>>>>> System.currentTimeMillis();
>>>>>>
>>>>>>     // 2015-02-01T18:00:25.673Z
>>>>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>>
>>>>>>     @Override
>>>>>>     protected void setup(Context context) throws IOException,
>>>>>>             InterruptedException {
>>>>>>
>>>>>>         amos = new AvroMultipleOutputs(context);
>>>>>>         baseOutputPath =
>>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>>
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     protected void reduce(Text key, Iterable<UberRecord> values,
>>>>>> Context context)
>>>>>>             throws IOException, InterruptedException {
>>>>>>
>>>>>>         try {
>>>>>>             UberRecord ub = new UberRecord();
>>>>>>             for (UberRecord ubi : values) {
>>>>>>                 // enrich
>>>>>>                 if (ubi.getExchange() == null) {
>>>>>>                     continue;
>>>>>>                 }
>>>>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>>                         .getEnricher(ubi.getExchange().toString());
>>>>>>                 enc.enrich(ubi);
>>>>>>                 ub = mergeUB(ub, ubi);
>>>>>>             }
>>>>>>             logger.info("Writing UberRecord [" + ub.toString() +
>>>>>> "]");
>>>>>>             String partition = getPartition(ub);
>>>>>>
>>>>>>             // context.write();
>>>>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>>                     new AvroValue<UberRecord>(ub), baseOutputPath +
>>>>>> "/"
>>>>>>                             + partition + "/p" +
>>>>>> reduceAttemptUniqueIdentifier);
>>>>>>         } catch (Exception e) {
>>>>>>             // TODO Auto-generated catch block
>>>>>>             e.printStackTrace();
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>>         List<Field> engFields =
>>>>>> EngageData.getClassSchema().getFields();
>>>>>>         for (Field field : fields) {
>>>>>>             if (field.name().equals("engageData")
>>>>>>                     && dest.getEngageData() != null) {
>>>>>>                 EngageData mergedEng = dest.getEngageData();
>>>>>>                 for (Field engField : engFields) {
>>>>>>                     if (dest.getEngageData().get(engField.name()) ==
>>>>>> null) {
>>>>>>                         mergedEng.put(engField.name(),
>>>>>>
>>>>>> src.getEngageData().get(engField.name()));
>>>>>>                     }
>>>>>>
>>>>>>                 }
>>>>>>                 dest.setEngageData(mergedEng);
>>>>>>             } else {
>>>>>>                 if (dest.get(field.name()) == null) {
>>>>>>                     dest.put(field.name(), src.get(field.name()));
>>>>>>                 }
>>>>>>             }
>>>>>>         }
>>>>>>         return dest;
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> Deepak
>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>

Re: Avromultiple output

Posted by David Ginzburg <da...@gmail.com>.
I am running with EMR
AMI version:3.3.1
Hadoop distribution:Amazon 2.4.0

The hadoop jars are provided


 <hadoop.version>2.5.0-mr1-cdh5.2.1</hadoop.version>

<dependencies>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>18.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2-beta</version>
            <scope>compile</scope>
            <exclusions>
                <exclusion>
                    <groupId>com.sun.jmx</groupId>
                    <artifactId>jmxri</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.sun.jdmk</groupId>
                    <artifactId>jmxtools</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.jms</groupId>
                    <artifactId>jms</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.maxmind.geoip2</groupId>
            <artifactId>geoip2</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk</artifactId>
            <version>1.9.3</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>1.6</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.9.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.1.0</version>
            <classifier>hadoop2</classifier>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>${hadoop.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>jets3t</artifactId>
                    <groupId>net.java.dev.jets3t</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <artifactId>jets3t</artifactId>
            <groupId>net.java.dev.jets3t</groupId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version><!-- 2.5.0-mr1-cdh5.2.1 -->
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>avro</artifactId>
                    <groupId>org.apache.avro</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <artifactId>avro-mapred</artifactId>
            <groupId>org.apache.avro</groupId>
            <version>1.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.mrunit</groupId>
            <artifactId>mrunit</artifactId>
            <version>1.0.0</version>
            <classifier>hadoop1</classifier>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>org.json</artifactId>
            <version>chargebee-1.0</version>
        </dependency>
        <dependency>
            <groupId>com.hadoop.gplcompression</groupId>
            <artifactId>hadoop-lzo</artifactId>
            <version>0.4.19</version>
        </dependency>
        <dependency>
            <groupId>net.sf.uadetector</groupId>
            <artifactId>uadetector-resources</artifactId>
            <version>2014.04</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.9.13</version>
        </dependency>

    </dependencies>



On Tue, Feb 24, 2015 at 7:38 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:

> Share your avro dependencies(versions) in case your using maven and hadoop
> dependencies (version)
>
>
>
> On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
>> Check your Hadoop version. In older version JobContext was interface and
>> in new one its class.
>>
>> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <davidginzburg@gmail.com
>> > wrote:
>>
>>> Thank you for the answer.
>>>
>>> Tried but the exception
>>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>>> class was expected*
>>> persists
>>>
>>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>>> wrote:
>>>
>>>> try this
>>>>
>>>> Job job = Job.getInstance(conf);
>>>> Job.setName(name);
>>>>
>>>> Artem Ervits
>>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>>> expected
>>>>>
>>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>>
>>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>>
>>>>> java.lang.NullPointerException
>>>>>     at
>>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>>     at
>>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>>     at
>>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>>     at
>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>>
>>>>>
>>>>> The driver code is
>>>>>
>>>>> Job job = new Job(getConf(), "myad");
>>>>>
>>>>>
>>>>>
>>>>>         job.setOutputValueClass(NullWritable.class);
>>>>>
>>>>>
>>>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>>>         Path inputOflineFiles = new Path(args[0]);
>>>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>>>
>>>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>>
>>>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>>>         job.setMapOutputKeyClass(Text.class);
>>>>>         job.setMapOutputValueClass(UberRecord.class);
>>>>>
>>>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>>         AvroJob.setOutputKeySchema(job,
>>>>> Schema.create(Schema.Type.STRING));
>>>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>>
>>>>>
>>>>>         job.setReducerClass(myAdReducer.class);
>>>>>         job.setOutputKeyClass(Text.class);
>>>>>         job.setOutputValueClass(UberRecord.class);
>>>>>         job.setNumReduceTasks(2);
>>>>>         String baseoutputFolder = args[2];
>>>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>>                 baseoutputFolder);
>>>>>         ;
>>>>>
>>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>>
>>>>>         FileOutputFormat.setOutputPath(job, new
>>>>> Path(baseoutputFolder));
>>>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>>>
>>>>>
>>>>> the mapper and reducers
>>>>> @Override
>>>>>     public void setup(Context ctx) {
>>>>>
>>>>>         ubp = new UberRecordProcessor();
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     protected void map(LongWritable key, Text value, Context context)
>>>>>             throws IOException, InterruptedException {
>>>>>         try {
>>>>>             handleLineinMap(value);
>>>>>             if(ub!=null){
>>>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>>>> ub);
>>>>>                 context.getCounter("myAd",
>>>>> "myAdTextLineMapper").increment(1);
>>>>>             }else{
>>>>>                 context.getCounter("myAd",
>>>>> "myAdTextLineMapperNull").increment(1);
>>>>>             }
>>>>>         } catch (Exception e) {
>>>>>             context.getCounter("myAd",
>>>>> "myAdTextLineMapperError").increment(1);
>>>>>             logger.warn("could not parse line "+value.toString(),e);
>>>>>
>>>>>
>>>>>         }
>>>>>     }
>>>>>
>>>>> public class myAdReducer extends
>>>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>>> AvroValue<UberRecord>> {
>>>>>
>>>>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>>>     public static final String BASE_OUTPUT_FOLDER =
>>>>> "base.output.folder";
>>>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord>
>>>>> outputs;
>>>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>>>     private String baseOutputPath;
>>>>>     private long reduceAttemptUniqueIdentifier =
>>>>> System.currentTimeMillis();
>>>>>
>>>>>     // 2015-02-01T18:00:25.673Z
>>>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>>
>>>>>     @Override
>>>>>     protected void setup(Context context) throws IOException,
>>>>>             InterruptedException {
>>>>>
>>>>>         amos = new AvroMultipleOutputs(context);
>>>>>         baseOutputPath =
>>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>>
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     protected void reduce(Text key, Iterable<UberRecord> values,
>>>>> Context context)
>>>>>             throws IOException, InterruptedException {
>>>>>
>>>>>         try {
>>>>>             UberRecord ub = new UberRecord();
>>>>>             for (UberRecord ubi : values) {
>>>>>                 // enrich
>>>>>                 if (ubi.getExchange() == null) {
>>>>>                     continue;
>>>>>                 }
>>>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>>                         .getEnricher(ubi.getExchange().toString());
>>>>>                 enc.enrich(ubi);
>>>>>                 ub = mergeUB(ub, ubi);
>>>>>             }
>>>>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>>>             String partition = getPartition(ub);
>>>>>
>>>>>             // context.write();
>>>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>>>                             + partition + "/p" +
>>>>> reduceAttemptUniqueIdentifier);
>>>>>         } catch (Exception e) {
>>>>>             // TODO Auto-generated catch block
>>>>>             e.printStackTrace();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>>         List<Field> engFields =
>>>>> EngageData.getClassSchema().getFields();
>>>>>         for (Field field : fields) {
>>>>>             if (field.name().equals("engageData")
>>>>>                     && dest.getEngageData() != null) {
>>>>>                 EngageData mergedEng = dest.getEngageData();
>>>>>                 for (Field engField : engFields) {
>>>>>                     if (dest.getEngageData().get(engField.name()) ==
>>>>> null) {
>>>>>                         mergedEng.put(engField.name(),
>>>>>
>>>>> src.getEngageData().get(engField.name()));
>>>>>                     }
>>>>>
>>>>>                 }
>>>>>                 dest.setEngageData(mergedEng);
>>>>>             } else {
>>>>>                 if (dest.get(field.name()) == null) {
>>>>>                     dest.put(field.name(), src.get(field.name()));
>>>>>                 }
>>>>>             }
>>>>>         }
>>>>>         return dest;
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Deepak
>
>

Re: Avromultiple output

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
Share your avro dependencies(versions) in case your using maven and hadoop
dependencies (version)



On Tue, Feb 24, 2015 at 11:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:

> Check your Hadoop version. In older version JobContext was interface and
> in new one its class.
>
> On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <da...@gmail.com>
> wrote:
>
>> Thank you for the answer.
>>
>> Tried but the exception
>> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but
>> class was expected*
>> persists
>>
>> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
>> wrote:
>>
>>> try this
>>>
>>> Job job = Job.getInstance(conf);
>>> Job.setName(name);
>>>
>>> Artem Ervits
>>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>>
>>>>
>>>>
>>>>
>>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>>> expected
>>>>
>>>> I read it is related to incompatible hadoop versions, So I modified
>>>>
>>>> When running with AMI with hadoop 103 I get the following exception:
>>>>
>>>> java.lang.NullPointerException
>>>>     at
>>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>>     at
>>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>>     at
>>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>     at
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>>
>>>>
>>>> The driver code is
>>>>
>>>> Job job = new Job(getConf(), "myad");
>>>>
>>>>
>>>>
>>>>         job.setOutputValueClass(NullWritable.class);
>>>>
>>>>
>>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>>         Path inputOflineFiles = new Path(args[0]);
>>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>>
>>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>>
>>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>>         job.setMapOutputKeyClass(Text.class);
>>>>         job.setMapOutputValueClass(UberRecord.class);
>>>>
>>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>>         AvroJob.setOutputKeySchema(job,
>>>> Schema.create(Schema.Type.STRING));
>>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>>
>>>>
>>>>         job.setReducerClass(myAdReducer.class);
>>>>         job.setOutputKeyClass(Text.class);
>>>>         job.setOutputValueClass(UberRecord.class);
>>>>         job.setNumReduceTasks(2);
>>>>         String baseoutputFolder = args[2];
>>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>>                 baseoutputFolder);
>>>>         ;
>>>>
>>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>>
>>>>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>>
>>>>
>>>> the mapper and reducers
>>>> @Override
>>>>     public void setup(Context ctx) {
>>>>
>>>>         ubp = new UberRecordProcessor();
>>>>     }
>>>>
>>>>     @Override
>>>>     protected void map(LongWritable key, Text value, Context context)
>>>>             throws IOException, InterruptedException {
>>>>         try {
>>>>             handleLineinMap(value);
>>>>             if(ub!=null){
>>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>>> ub);
>>>>                 context.getCounter("myAd",
>>>> "myAdTextLineMapper").increment(1);
>>>>             }else{
>>>>                 context.getCounter("myAd",
>>>> "myAdTextLineMapperNull").increment(1);
>>>>             }
>>>>         } catch (Exception e) {
>>>>             context.getCounter("myAd",
>>>> "myAdTextLineMapperError").increment(1);
>>>>             logger.warn("could not parse line "+value.toString(),e);
>>>>
>>>>
>>>>         }
>>>>     }
>>>>
>>>> public class myAdReducer extends
>>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>>> AvroValue<UberRecord>> {
>>>>
>>>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>>     public static final String BASE_OUTPUT_FOLDER =
>>>> "base.output.folder";
>>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>>     private String baseOutputPath;
>>>>     private long reduceAttemptUniqueIdentifier =
>>>> System.currentTimeMillis();
>>>>
>>>>     // 2015-02-01T18:00:25.673Z
>>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>>
>>>>     @Override
>>>>     protected void setup(Context context) throws IOException,
>>>>             InterruptedException {
>>>>
>>>>         amos = new AvroMultipleOutputs(context);
>>>>         baseOutputPath =
>>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>>
>>>>     }
>>>>
>>>>     @Override
>>>>     protected void reduce(Text key, Iterable<UberRecord> values,
>>>> Context context)
>>>>             throws IOException, InterruptedException {
>>>>
>>>>         try {
>>>>             UberRecord ub = new UberRecord();
>>>>             for (UberRecord ubi : values) {
>>>>                 // enrich
>>>>                 if (ubi.getExchange() == null) {
>>>>                     continue;
>>>>                 }
>>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>>                         .getEnricher(ubi.getExchange().toString());
>>>>                 enc.enrich(ubi);
>>>>                 ub = mergeUB(ub, ubi);
>>>>             }
>>>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>>             String partition = getPartition(ub);
>>>>
>>>>             // context.write();
>>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>>                             + partition + "/p" +
>>>> reduceAttemptUniqueIdentifier);
>>>>         } catch (Exception e) {
>>>>             // TODO Auto-generated catch block
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>>         List<Field> engFields = EngageData.getClassSchema().getFields();
>>>>         for (Field field : fields) {
>>>>             if (field.name().equals("engageData")
>>>>                     && dest.getEngageData() != null) {
>>>>                 EngageData mergedEng = dest.getEngageData();
>>>>                 for (Field engField : engFields) {
>>>>                     if (dest.getEngageData().get(engField.name()) ==
>>>> null) {
>>>>                         mergedEng.put(engField.name(),
>>>>
>>>> src.getEngageData().get(engField.name()));
>>>>                     }
>>>>
>>>>                 }
>>>>                 dest.setEngageData(mergedEng);
>>>>             } else {
>>>>                 if (dest.get(field.name()) == null) {
>>>>                     dest.put(field.name(), src.get(field.name()));
>>>>                 }
>>>>             }
>>>>         }
>>>>         return dest;
>>>>     }
>>>>
>>>>
>>>>
>>>>
>>
>
>
> --
> Deepak
>
>


-- 
Deepak

Re: Avromultiple output

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
Check your Hadoop version. In older version JobContext was interface and in
new one its class.

On Tue, Feb 24, 2015 at 10:28 PM, David Ginzburg <da...@gmail.com>
wrote:

> Thank you for the answer.
>
> Tried but the exception
> * Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class
> was expected*
> persists
>
> On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com>
> wrote:
>
>> try this
>>
>> Job job = Job.getInstance(conf);
>> Job.setName(name);
>>
>> Artem Ervits
>> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to run an MR job on emr with AvromultipleOutput
>>>
>>>
>>>
>>>
>>> I get the following exception when running with AMI with hadoop 2.2 2.5
>>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>>> expected
>>>
>>> I read it is related to incompatible hadoop versions, So I modified
>>>
>>> When running with AMI with hadoop 103 I get the following exception:
>>>
>>> java.lang.NullPointerException
>>>     at
>>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>>     at
>>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>>     at
>>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>>     at java.security.AccessController.doPrivileged(Native Method)
>>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>>     at
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>>
>>>
>>> The driver code is
>>>
>>> Job job = new Job(getConf(), "myad");
>>>
>>>
>>>
>>>         job.setOutputValueClass(NullWritable.class);
>>>
>>>
>>>         job.setJarByClass(myAdTextLineMapper.class);
>>>         Path inputOflineFiles = new Path(args[0]);
>>>         Path inputOfUbberFiles = new Path(args[1]);
>>>
>>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>>
>>>         job.setMapperClass(myAdTextLineMapper.class);
>>>         job.setMapOutputKeyClass(Text.class);
>>>         job.setMapOutputValueClass(UberRecord.class);
>>>
>>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>>         AvroJob.setOutputKeySchema(job,
>>> Schema.create(Schema.Type.STRING));
>>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>>
>>>
>>>         job.setReducerClass(myAdReducer.class);
>>>         job.setOutputKeyClass(Text.class);
>>>         job.setOutputValueClass(UberRecord.class);
>>>         job.setNumReduceTasks(2);
>>>         String baseoutputFolder = args[2];
>>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>>                 baseoutputFolder);
>>>         ;
>>>
>>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>>
>>>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>>         return job.waitForCompletion(true) ? 0 : 1;
>>>
>>>
>>> the mapper and reducers
>>> @Override
>>>     public void setup(Context ctx) {
>>>
>>>         ubp = new UberRecordProcessor();
>>>     }
>>>
>>>     @Override
>>>     protected void map(LongWritable key, Text value, Context context)
>>>             throws IOException, InterruptedException {
>>>         try {
>>>             handleLineinMap(value);
>>>             if(ub!=null){
>>>                 context.write(new Text(ub.getAuctionId().toString()),
>>> ub);
>>>                 context.getCounter("myAd",
>>> "myAdTextLineMapper").increment(1);
>>>             }else{
>>>                 context.getCounter("myAd",
>>> "myAdTextLineMapperNull").increment(1);
>>>             }
>>>         } catch (Exception e) {
>>>             context.getCounter("myAd",
>>> "myAdTextLineMapperError").increment(1);
>>>             logger.warn("could not parse line "+value.toString(),e);
>>>
>>>
>>>         }
>>>     }
>>>
>>> public class myAdReducer extends
>>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>>> AvroValue<UberRecord>> {
>>>
>>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>>     // "year=%s/month=%s/day=%s/hour=%s"
>>>     private String baseOutputPath;
>>>     private long reduceAttemptUniqueIdentifier =
>>> System.currentTimeMillis();
>>>
>>>     // 2015-02-01T18:00:25.673Z
>>>     static DateTimeFormatter dateformat = DateTimeFormat
>>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>>
>>>     @Override
>>>     protected void setup(Context context) throws IOException,
>>>             InterruptedException {
>>>
>>>         amos = new AvroMultipleOutputs(context);
>>>         baseOutputPath =
>>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>>
>>>     }
>>>
>>>     @Override
>>>     protected void reduce(Text key, Iterable<UberRecord> values, Context
>>> context)
>>>             throws IOException, InterruptedException {
>>>
>>>         try {
>>>             UberRecord ub = new UberRecord();
>>>             for (UberRecord ubi : values) {
>>>                 // enrich
>>>                 if (ubi.getExchange() == null) {
>>>                     continue;
>>>                 }
>>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>>                         .getEnricher(ubi.getExchange().toString());
>>>                 enc.enrich(ubi);
>>>                 ub = mergeUB(ub, ubi);
>>>             }
>>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>>             String partition = getPartition(ub);
>>>
>>>             // context.write();
>>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>>                             + partition + "/p" +
>>> reduceAttemptUniqueIdentifier);
>>>         } catch (Exception e) {
>>>             // TODO Auto-generated catch block
>>>             e.printStackTrace();
>>>         }
>>>     }
>>>
>>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>>         List<Field> engFields = EngageData.getClassSchema().getFields();
>>>         for (Field field : fields) {
>>>             if (field.name().equals("engageData")
>>>                     && dest.getEngageData() != null) {
>>>                 EngageData mergedEng = dest.getEngageData();
>>>                 for (Field engField : engFields) {
>>>                     if (dest.getEngageData().get(engField.name()) ==
>>> null) {
>>>                         mergedEng.put(engField.name(),
>>>
>>> src.getEngageData().get(engField.name()));
>>>                     }
>>>
>>>                 }
>>>                 dest.setEngageData(mergedEng);
>>>             } else {
>>>                 if (dest.get(field.name()) == null) {
>>>                     dest.put(field.name(), src.get(field.name()));
>>>                 }
>>>             }
>>>         }
>>>         return dest;
>>>     }
>>>
>>>
>>>
>>>
>


-- 
Deepak

Re: Avromultiple output

Posted by David Ginzburg <da...@gmail.com>.
Thank you for the answer.

Tried but the exception
* Error: Found interface org.apache.hadoop.mapreduce.JobContext, but class
was expected*
persists

On Tue, Feb 24, 2015 at 3:12 PM, Artem Ervits <ar...@gmail.com> wrote:

> try this
>
> Job job = Job.getInstance(conf);
> Job.setName(name);
>
> Artem Ervits
> On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to run an MR job on emr with AvromultipleOutput
>>
>>
>>
>>
>> I get the following exception when running with AMI with hadoop 2.2 2.5
>> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
>> expected
>>
>> I read it is related to incompatible hadoop versions, So I modified
>>
>> When running with AMI with hadoop 103 I get the following exception:
>>
>> java.lang.NullPointerException
>>     at
>> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>>     at
>> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>>     at
>> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>>     at java.security.AccessController.doPrivileged(Native Method)
>>     at javax.security.auth.Subject.doAs(Subject.java:415)
>>     at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>>
>>
>> The driver code is
>>
>> Job job = new Job(getConf(), "myad");
>>
>>
>>
>>         job.setOutputValueClass(NullWritable.class);
>>
>>
>>         job.setJarByClass(myAdTextLineMapper.class);
>>         Path inputOflineFiles = new Path(args[0]);
>>         Path inputOfUbberFiles = new Path(args[1]);
>>
>>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>>
>>         job.setMapperClass(myAdTextLineMapper.class);
>>         job.setMapOutputKeyClass(Text.class);
>>         job.setMapOutputValueClass(UberRecord.class);
>>
>>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>>         AvroJob.setOutputKeySchema(job,
>> Schema.create(Schema.Type.STRING));
>>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>>
>>
>>         job.setReducerClass(myAdReducer.class);
>>         job.setOutputKeyClass(Text.class);
>>         job.setOutputValueClass(UberRecord.class);
>>         job.setNumReduceTasks(2);
>>         String baseoutputFolder = args[2];
>>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>>                 baseoutputFolder);
>>         ;
>>
>> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>>
>>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>>         return job.waitForCompletion(true) ? 0 : 1;
>>
>>
>> the mapper and reducers
>> @Override
>>     public void setup(Context ctx) {
>>
>>         ubp = new UberRecordProcessor();
>>     }
>>
>>     @Override
>>     protected void map(LongWritable key, Text value, Context context)
>>             throws IOException, InterruptedException {
>>         try {
>>             handleLineinMap(value);
>>             if(ub!=null){
>>                 context.write(new Text(ub.getAuctionId().toString()), ub);
>>                 context.getCounter("myAd",
>> "myAdTextLineMapper").increment(1);
>>             }else{
>>                 context.getCounter("myAd",
>> "myAdTextLineMapperNull").increment(1);
>>             }
>>         } catch (Exception e) {
>>             context.getCounter("myAd",
>> "myAdTextLineMapperError").increment(1);
>>             logger.warn("could not parse line "+value.toString(),e);
>>
>>
>>         }
>>     }
>>
>> public class myAdReducer extends
>>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
>> AvroValue<UberRecord>> {
>>
>>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>>     UberRecordProcessor ubp = new UberRecordProcessor();
>>     // "year=%s/month=%s/day=%s/hour=%s"
>>     private String baseOutputPath;
>>     private long reduceAttemptUniqueIdentifier =
>> System.currentTimeMillis();
>>
>>     // 2015-02-01T18:00:25.673Z
>>     static DateTimeFormatter dateformat = DateTimeFormat
>>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>>
>>     @Override
>>     protected void setup(Context context) throws IOException,
>>             InterruptedException {
>>
>>         amos = new AvroMultipleOutputs(context);
>>         baseOutputPath =
>> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>>
>>     }
>>
>>     @Override
>>     protected void reduce(Text key, Iterable<UberRecord> values, Context
>> context)
>>             throws IOException, InterruptedException {
>>
>>         try {
>>             UberRecord ub = new UberRecord();
>>             for (UberRecord ubi : values) {
>>                 // enrich
>>                 if (ubi.getExchange() == null) {
>>                     continue;
>>                 }
>>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>>                         .getEnricher(ubi.getExchange().toString());
>>                 enc.enrich(ubi);
>>                 ub = mergeUB(ub, ubi);
>>             }
>>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>>             String partition = getPartition(ub);
>>
>>             // context.write();
>>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>>             amos.write(new AvroKey<CharSequence>(key.toString()),
>>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>>                             + partition + "/p" +
>> reduceAttemptUniqueIdentifier);
>>         } catch (Exception e) {
>>             // TODO Auto-generated catch block
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>>         List<Field> fields = UberRecord.getClassSchema().getFields();
>>         List<Field> engFields = EngageData.getClassSchema().getFields();
>>         for (Field field : fields) {
>>             if (field.name().equals("engageData")
>>                     && dest.getEngageData() != null) {
>>                 EngageData mergedEng = dest.getEngageData();
>>                 for (Field engField : engFields) {
>>                     if (dest.getEngageData().get(engField.name()) ==
>> null) {
>>                         mergedEng.put(engField.name(),
>>                                 src.getEngageData().get(engField.name()));
>>                     }
>>
>>                 }
>>                 dest.setEngageData(mergedEng);
>>             } else {
>>                 if (dest.get(field.name()) == null) {
>>                     dest.put(field.name(), src.get(field.name()));
>>                 }
>>             }
>>         }
>>         return dest;
>>     }
>>
>>
>>
>>

Re: Avromultiple output

Posted by Artem Ervits <ar...@gmail.com>.
try this

Job job = Job.getInstance(conf);
Job.setName(name);

Artem Ervits
On Feb 21, 2015 10:57 PM, "David Ginzburg" <da...@gmail.com> wrote:

> Hi,
> I am trying to run an MR job on emr with AvromultipleOutput
>
>
>
>
> I get the following exception when running with AMI with hadoop 2.2 2.5
> Found interface org.apache.hadoop.mapreduce.JobContext, but class was
> expected
>
> I read it is related to incompatible hadoop versions, So I modified
>
> When running with AMI with hadoop 103 I get the following exception:
>
> java.lang.NullPointerException
>     at
> org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
>     at
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:981)
>     at
> org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:681)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1140)
>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
>
>
> The driver code is
>
> Job job = new Job(getConf(), "myad");
>
>
>
>         job.setOutputValueClass(NullWritable.class);
>
>
>         job.setJarByClass(myAdTextLineMapper.class);
>         Path inputOflineFiles = new Path(args[0]);
>         Path inputOfUbberFiles = new Path(args[1]);
>
>          FileInputFormat.setInputPaths(job, inputOflineFiles);
>
>         job.setMapperClass(myAdTextLineMapper.class);
>         job.setMapOutputKeyClass(Text.class);
>         job.setMapOutputValueClass(UberRecord.class);
>
>         job.setOutputFormatClass(AvroSequenceFileOutputFormat.class);
>         AvroJob.setOutputKeySchema(job, Schema.create(Schema.Type.STRING));
>         AvroJob.setOutputValueSchema(job, UberRecord.SCHEMA$);
>
>
>         job.setReducerClass(myAdReducer.class);
>         job.setOutputKeyClass(Text.class);
>         job.setOutputValueClass(UberRecord.class);
>         job.setNumReduceTasks(2);
>         String baseoutputFolder = args[2];
>         job.getConfiguration().set(myAdReducer.BASE_OUTPUT_FOLDER,
>                 baseoutputFolder);
>         ;
>
> LazyOutputFormat.setOutputFormatClass(job,AvroSequenceFileOutputFormat.class);
>
>         FileOutputFormat.setOutputPath(job, new Path(baseoutputFolder));
>         return job.waitForCompletion(true) ? 0 : 1;
>
>
> the mapper and reducers
> @Override
>     public void setup(Context ctx) {
>
>         ubp = new UberRecordProcessor();
>     }
>
>     @Override
>     protected void map(LongWritable key, Text value, Context context)
>             throws IOException, InterruptedException {
>         try {
>             handleLineinMap(value);
>             if(ub!=null){
>                 context.write(new Text(ub.getAuctionId().toString()), ub);
>                 context.getCounter("myAd",
> "myAdTextLineMapper").increment(1);
>             }else{
>                 context.getCounter("myAd",
> "myAdTextLineMapperNull").increment(1);
>             }
>         } catch (Exception e) {
>             context.getCounter("myAd",
> "myAdTextLineMapperError").increment(1);
>             logger.warn("could not parse line "+value.toString(),e);
>
>
>         }
>     }
>
> public class myAdReducer extends
>         Reducer<Text, UberRecord, AvroKey<CharSequence>,
> AvroValue<UberRecord>> {
>
>     private static Logger logger = Logger.getLogger(myAdReducer.class);
>     public static final String BASE_OUTPUT_FOLDER = "base.output.folder";
>     AvroMultipleOutputs amos; MultipleOutputs<Text, UberRecord> outputs;
>     UberRecordProcessor ubp = new UberRecordProcessor();
>     // "year=%s/month=%s/day=%s/hour=%s"
>     private String baseOutputPath;
>     private long reduceAttemptUniqueIdentifier =
> System.currentTimeMillis();
>
>     // 2015-02-01T18:00:25.673Z
>     static DateTimeFormatter dateformat = DateTimeFormat
>             .forPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ");
>
>     @Override
>     protected void setup(Context context) throws IOException,
>             InterruptedException {
>
>         amos = new AvroMultipleOutputs(context);
>         baseOutputPath =
> context.getConfiguration().get(BASE_OUTPUT_FOLDER);
>
>     }
>
>     @Override
>     protected void reduce(Text key, Iterable<UberRecord> values, Context
> context)
>             throws IOException, InterruptedException {
>
>         try {
>             UberRecord ub = new UberRecord();
>             for (UberRecord ubi : values) {
>                 // enrich
>                 if (ubi.getExchange() == null) {
>                     continue;
>                 }
>                 BaseBidRequestEnricher enc = BaseBidRequestEnricher
>                         .getEnricher(ubi.getExchange().toString());
>                 enc.enrich(ubi);
>                 ub = mergeUB(ub, ubi);
>             }
>             logger.info("Writing UberRecord [" + ub.toString() + "]");
>             String partition = getPartition(ub);
>
>             // context.write();
>             // AvroKey<CharSequence>, AvroValue<UberRecord>>
>             amos.write(new AvroKey<CharSequence>(key.toString()),
>                     new AvroValue<UberRecord>(ub), baseOutputPath + "/"
>                             + partition + "/p" +
> reduceAttemptUniqueIdentifier);
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
>
>     public UberRecord mergeUB(UberRecord dest, UberRecord src) {
>         List<Field> fields = UberRecord.getClassSchema().getFields();
>         List<Field> engFields = EngageData.getClassSchema().getFields();
>         for (Field field : fields) {
>             if (field.name().equals("engageData")
>                     && dest.getEngageData() != null) {
>                 EngageData mergedEng = dest.getEngageData();
>                 for (Field engField : engFields) {
>                     if (dest.getEngageData().get(engField.name()) == null)
> {
>                         mergedEng.put(engField.name(),
>                                 src.getEngageData().get(engField.name()));
>                     }
>
>                 }
>                 dest.setEngageData(mergedEng);
>             } else {
>                 if (dest.get(field.name()) == null) {
>                     dest.put(field.name(), src.get(field.name()));
>                 }
>             }
>         }
>         return dest;
>     }
>
>
>
>