You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Kristoffer Sjögren <st...@gmail.com> on 2014/06/02 13:53:04 UTC

Avro+Parquet

Hi

Im trying to read and write data using the avro+parquet combo that
ships with crunch 0.8.0-cdh4.3.0.

- The writer job looks like this.

PCollection<String> lines = ...
PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
  @Override
  public void process(String input, Emitter<User> emitter) {
    User user = User.newBuilder().setName(input).build();
    emitter.emit(user);
  }
}, Avros.records(User.class));

AvroParquetFileSourceTarget fout = new
AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
pipeline.write(p, fout);

- The reader job looks like this.

AvroParquetFileSource<User> file = new
AvroParquetFileSource<User>(out, Avros.records(User.class));
PCollection<User> users = pipeline.read(file);
// this line fails with a ClassCastException
PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
 @Override
 public void process(User user, Emitter<String> emitter) {
    emitter.emit(user.getName().toString());
  }
}, Writables.strings());


However, the reader fails with a java.lang.ClassCastException? Is this
a know issue or am I doing something wrong?

Cheers,
-Kristoffer


java.lang.ClassCastException:
org.apache.avro.generic.GenericData$Record cannot be cast to
mapred.jobs.User
at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
at org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
at org.apache.crunch.MapFn.process(MapFn.java:34)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)

Re: Avro+Parquet

Posted by Josh Wills <jo...@gmail.com>.
Ah, that's a good one. Glad it's working now.


On Wed, Jun 4, 2014 at 7:38 AM, Kristoffer Sjögren <st...@gmail.com> wrote:

> Sorry for the late reply.
>
> It was my fault. A refactoring changed the java package name without
> changing Avro schema namespace in the $SCHEMA field, which caused Avro
> to fallback on generic records. Works find now!
>
> On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <st...@gmail.com>
> wrote:
> > Thanks for the quick answer!  My initial test still fail, but I may
> > have done something wrong here. I will do a more thorough test asap.
> >
> > On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mk...@gmail.com>
> wrote:
> >> I don't believe it is a known issue.  I modified an
> AvroParquetPipelineIT[1]
> >> to verify the output to a target using a source..
> >>
> >>   @Test
> >>   public void toAvroParquetFileTargetFromParquet() throws Exception {
> >>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
> >>     savedRecord.put("name", "John Doe");
> >>     savedRecord.put("age", 42);
> >>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy",
> "Jane"));
> >>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
> >> Person.SCHEMA$);
> >>
> >>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
> >> tmpDir.getDefaultConfiguration());
> >>     PCollection<Person> genericCollection = pipeline.read(
> >>         new AvroParquetFileSource<Person>(new
> >> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
> >>     File outputFile = tmpDir.getFile("output");
> >>     Target parquetFileTarget = new
> >> AvroParquetFileTarget(outputFile.getAbsolutePath());
> >>     pipeline.write(genericCollection, parquetFileTarget);
> >>     pipeline.run();
> >>
> >>     Person person = genericCollection.materialize().iterator().next();
> >>
> >>     PCollection<Person> persistedCollection = pipeline.read(
> >>         new AvroParquetFileSource<Person>(new
> >> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
> >>     Person persistedPerson =
> >> persistedCollection.materialize().iterator().next();
> >>
> >>     Path parquetFile = new Path(new File(outputFile,
> >> "part-m-00000.parquet").getPath());
> >>
> >>     AvroParquetReader<Person> reader = new
> >> AvroParquetReader<Person>(parquetFile);
> >>
> >>     try {
> >>       Person readPerson = reader.read();
> >>       assertThat(readPerson, is(person));
> >>       assertThat(readPerson, is(persistedPerson));
> >>     } finally {
> >>       reader.close();
> >>     }
> >>   }
> >>
> >> The tests passes without any issues.  There have been an number of fixes
> >> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
> >> version available (0.8.2+71-cdh4.6.0) and see if the problem still
> exists.
> >> If it does still exist a junit/integration test would be helpful to
> debug
> >> this issue.
> >>
> >>
> >> [1] -
> >>
> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
> >>
> >>
> >> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <st...@gmail.com>
> wrote:
> >>>
> >>> Hi
> >>>
> >>> Im trying to read and write data using the avro+parquet combo that
> >>> ships with crunch 0.8.0-cdh4.3.0.
> >>>
> >>> - The writer job looks like this.
> >>>
> >>> PCollection<String> lines = ...
> >>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
> >>>   @Override
> >>>   public void process(String input, Emitter<User> emitter) {
> >>>     User user = User.newBuilder().setName(input).build();
> >>>     emitter.emit(user);
> >>>   }
> >>> }, Avros.records(User.class));
> >>>
> >>> AvroParquetFileSourceTarget fout = new
> >>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
> >>> pipeline.write(p, fout);
> >>>
> >>> - The reader job looks like this.
> >>>
> >>> AvroParquetFileSource<User> file = new
> >>> AvroParquetFileSource<User>(out, Avros.records(User.class));
> >>> PCollection<User> users = pipeline.read(file);
> >>> // this line fails with a ClassCastException
> >>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
> >>>  @Override
> >>>  public void process(User user, Emitter<String> emitter) {
> >>>     emitter.emit(user.getName().toString());
> >>>   }
> >>> }, Writables.strings());
> >>>
> >>>
> >>> However, the reader fails with a java.lang.ClassCastException? Is this
> >>> a know issue or am I doing something wrong?
> >>>
> >>> Cheers,
> >>> -Kristoffer
> >>>
> >>>
> >>> java.lang.ClassCastException:
> >>> org.apache.avro.generic.GenericData$Record cannot be cast to
> >>> mapred.jobs.User
> >>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> >>> at
> >>>
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> >>> at org.apache.crunch.MapFn.process(MapFn.java:34)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> >>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
> >>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> >>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> >>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> >>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> >>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> >>> at java.security.AccessController.doPrivileged(Native Method)
> >>> at javax.security.auth.Subject.doAs(Subject.java:396)
> >>
> >>
>

Re: Avro+Parquet

Posted by Kristoffer Sjögren <st...@gmail.com>.
Sorry for the late reply.

It was my fault. A refactoring changed the java package name without
changing Avro schema namespace in the $SCHEMA field, which caused Avro
to fallback on generic records. Works find now!

On Tue, Jun 3, 2014 at 1:15 PM, Kristoffer Sjögren <st...@gmail.com> wrote:
> Thanks for the quick answer!  My initial test still fail, but I may
> have done something wrong here. I will do a more thorough test asap.
>
> On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mk...@gmail.com> wrote:
>> I don't believe it is a known issue.  I modified an AvroParquetPipelineIT[1]
>> to verify the output to a target using a source..
>>
>>   @Test
>>   public void toAvroParquetFileTargetFromParquet() throws Exception {
>>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
>>     savedRecord.put("name", "John Doe");
>>     savedRecord.put("age", 42);
>>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
>>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
>> Person.SCHEMA$);
>>
>>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
>> tmpDir.getDefaultConfiguration());
>>     PCollection<Person> genericCollection = pipeline.read(
>>         new AvroParquetFileSource<Person>(new
>> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
>>     File outputFile = tmpDir.getFile("output");
>>     Target parquetFileTarget = new
>> AvroParquetFileTarget(outputFile.getAbsolutePath());
>>     pipeline.write(genericCollection, parquetFileTarget);
>>     pipeline.run();
>>
>>     Person person = genericCollection.materialize().iterator().next();
>>
>>     PCollection<Person> persistedCollection = pipeline.read(
>>         new AvroParquetFileSource<Person>(new
>> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
>>     Person persistedPerson =
>> persistedCollection.materialize().iterator().next();
>>
>>     Path parquetFile = new Path(new File(outputFile,
>> "part-m-00000.parquet").getPath());
>>
>>     AvroParquetReader<Person> reader = new
>> AvroParquetReader<Person>(parquetFile);
>>
>>     try {
>>       Person readPerson = reader.read();
>>       assertThat(readPerson, is(person));
>>       assertThat(readPerson, is(persistedPerson));
>>     } finally {
>>       reader.close();
>>     }
>>   }
>>
>> The tests passes without any issues.  There have been an number of fixes
>> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
>> version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
>> If it does still exist a junit/integration test would be helpful to debug
>> this issue.
>>
>>
>> [1] -
>> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
>>
>>
>> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <st...@gmail.com> wrote:
>>>
>>> Hi
>>>
>>> Im trying to read and write data using the avro+parquet combo that
>>> ships with crunch 0.8.0-cdh4.3.0.
>>>
>>> - The writer job looks like this.
>>>
>>> PCollection<String> lines = ...
>>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>>>   @Override
>>>   public void process(String input, Emitter<User> emitter) {
>>>     User user = User.newBuilder().setName(input).build();
>>>     emitter.emit(user);
>>>   }
>>> }, Avros.records(User.class));
>>>
>>> AvroParquetFileSourceTarget fout = new
>>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
>>> pipeline.write(p, fout);
>>>
>>> - The reader job looks like this.
>>>
>>> AvroParquetFileSource<User> file = new
>>> AvroParquetFileSource<User>(out, Avros.records(User.class));
>>> PCollection<User> users = pipeline.read(file);
>>> // this line fails with a ClassCastException
>>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
>>>  @Override
>>>  public void process(User user, Emitter<String> emitter) {
>>>     emitter.emit(user.getName().toString());
>>>   }
>>> }, Writables.strings());
>>>
>>>
>>> However, the reader fails with a java.lang.ClassCastException? Is this
>>> a know issue or am I doing something wrong?
>>>
>>> Cheers,
>>> -Kristoffer
>>>
>>>
>>> java.lang.ClassCastException:
>>> org.apache.avro.generic.GenericData$Record cannot be cast to
>>> mapred.jobs.User
>>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>> at
>>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:396)
>>
>>

Re: Avro+Parquet

Posted by Kristoffer Sjögren <st...@gmail.com>.
Thanks for the quick answer!  My initial test still fail, but I may
have done something wrong here. I will do a more thorough test asap.

On Mon, Jun 2, 2014 at 2:53 PM, Micah Whitacre <mk...@gmail.com> wrote:
> I don't believe it is a known issue.  I modified an AvroParquetPipelineIT[1]
> to verify the output to a target using a source..
>
>   @Test
>   public void toAvroParquetFileTargetFromParquet() throws Exception {
>     GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
>     savedRecord.put("name", "John Doe");
>     savedRecord.put("age", 42);
>     savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
>     populateGenericParquetFile(Lists.newArrayList(savedRecord),
> Person.SCHEMA$);
>
>     Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
> tmpDir.getDefaultConfiguration());
>     PCollection<Person> genericCollection = pipeline.read(
>         new AvroParquetFileSource<Person>(new
> Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
>     File outputFile = tmpDir.getFile("output");
>     Target parquetFileTarget = new
> AvroParquetFileTarget(outputFile.getAbsolutePath());
>     pipeline.write(genericCollection, parquetFileTarget);
>     pipeline.run();
>
>     Person person = genericCollection.materialize().iterator().next();
>
>     PCollection<Person> persistedCollection = pipeline.read(
>         new AvroParquetFileSource<Person>(new
> Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
>     Person persistedPerson =
> persistedCollection.materialize().iterator().next();
>
>     Path parquetFile = new Path(new File(outputFile,
> "part-m-00000.parquet").getPath());
>
>     AvroParquetReader<Person> reader = new
> AvroParquetReader<Person>(parquetFile);
>
>     try {
>       Person readPerson = reader.read();
>       assertThat(readPerson, is(person));
>       assertThat(readPerson, is(persistedPerson));
>     } finally {
>       reader.close();
>     }
>   }
>
> The tests passes without any issues.  There have been an number of fixes
> since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
> version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
> If it does still exist a junit/integration test would be helpful to debug
> this issue.
>
>
> [1] -
> https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120
>
>
> On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <st...@gmail.com> wrote:
>>
>> Hi
>>
>> Im trying to read and write data using the avro+parquet combo that
>> ships with crunch 0.8.0-cdh4.3.0.
>>
>> - The writer job looks like this.
>>
>> PCollection<String> lines = ...
>> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>>   @Override
>>   public void process(String input, Emitter<User> emitter) {
>>     User user = User.newBuilder().setName(input).build();
>>     emitter.emit(user);
>>   }
>> }, Avros.records(User.class));
>>
>> AvroParquetFileSourceTarget fout = new
>> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
>> pipeline.write(p, fout);
>>
>> - The reader job looks like this.
>>
>> AvroParquetFileSource<User> file = new
>> AvroParquetFileSource<User>(out, Avros.records(User.class));
>> PCollection<User> users = pipeline.read(file);
>> // this line fails with a ClassCastException
>> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
>>  @Override
>>  public void process(User user, Emitter<String> emitter) {
>>     emitter.emit(user.getName().toString());
>>   }
>> }, Writables.strings());
>>
>>
>> However, the reader fails with a java.lang.ClassCastException? Is this
>> a know issue or am I doing something wrong?
>>
>> Cheers,
>> -Kristoffer
>>
>>
>> java.lang.ClassCastException:
>> org.apache.avro.generic.GenericData$Record cannot be cast to
>> mapred.jobs.User
>> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>> at
>> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
>> at org.apache.crunch.MapFn.process(MapFn.java:34)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
>> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
>> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
>> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
>> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
>> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
>> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:396)
>
>

Re: Avro+Parquet

Posted by Micah Whitacre <mk...@gmail.com>.
I don't believe it is a known issue.  I modified an
AvroParquetPipelineIT[1] to verify the output to a target using a source..

  @Test
  public void toAvroParquetFileTargetFromParquet() throws Exception {
    GenericRecord savedRecord = new GenericData.Record(Person.SCHEMA$);
    savedRecord.put("name", "John Doe");
    savedRecord.put("age", 42);
    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
    populateGenericParquetFile(Lists.newArrayList(savedRecord),
Person.SCHEMA$);

    Pipeline pipeline = new MRPipeline(AvroParquetPipelineIT.class,
tmpDir.getDefaultConfiguration());
    PCollection<Person> genericCollection = pipeline.read(
        new AvroParquetFileSource<Person>(new
Path(avroFile.getAbsolutePath()), Avros.records(Person.class)));
    File outputFile = tmpDir.getFile("output");
    Target parquetFileTarget = new
AvroParquetFileTarget(outputFile.getAbsolutePath());
    pipeline.write(genericCollection, parquetFileTarget);
    pipeline.run();

    Person person = genericCollection.materialize().iterator().next();

    PCollection<Person> persistedCollection = pipeline.read(
        new AvroParquetFileSource<Person>(new
Path(outputFile.getAbsolutePath()), Avros.records(Person.class)));
    Person persistedPerson =
persistedCollection.materialize().iterator().next();

    Path parquetFile = new Path(new File(outputFile,
"part-m-00000.parquet").getPath());

    AvroParquetReader<Person> reader = new
AvroParquetReader<Person>(parquetFile);

    try {
      Person readPerson = reader.read();
      assertThat(readPerson, is(person));
      assertThat(readPerson, is(persistedPerson));
    } finally {
      reader.close();
    }
  }

The tests passes without any issues.  There have been an number of fixes
since the 0.8.0-cdh4.3.0 version.  You might try upgrading to the latest
version available (0.8.2+71-cdh4.6.0) and see if the problem still exists.
 If it does still exist a junit/integration test would be helpful to debug
this issue.


[1] -
https://github.com/apache/crunch/blob/1d9b6cf3db6daa1ee6e0fa48dfd5966e821c71a3/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetPipelineIT.java#L120


On Mon, Jun 2, 2014 at 6:53 AM, Kristoffer Sjögren <st...@gmail.com> wrote:

> Hi
>
> Im trying to read and write data using the avro+parquet combo that
> ships with crunch 0.8.0-cdh4.3.0.
>
> - The writer job looks like this.
>
> PCollection<String> lines = ...
> PCollection<User> p = lines.parallelDo(new DoFn<String, User>() {
>   @Override
>   public void process(String input, Emitter<User> emitter) {
>     User user = User.newBuilder().setName(input).build();
>     emitter.emit(user);
>   }
> }, Avros.records(User.class));
>
> AvroParquetFileSourceTarget fout = new
> AvroParquetFileSourceTarget<User>(out, Avros.records(User.class));
> pipeline.write(p, fout);
>
> - The reader job looks like this.
>
> AvroParquetFileSource<User> file = new
> AvroParquetFileSource<User>(out, Avros.records(User.class));
> PCollection<User> users = pipeline.read(file);
> // this line fails with a ClassCastException
> PCollection<String> lines = users.parallelDo(new DoFn<User, String>() {
>  @Override
>  public void process(User user, Emitter<String> emitter) {
>     emitter.emit(user.getName().toString());
>   }
> }, Writables.strings());
>
>
> However, the reader fails with a java.lang.ClassCastException? Is this
> a know issue or am I doing something wrong?
>
> Cheers,
> -Kristoffer
>
>
> java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to
> mapred.jobs.User
> at mapred.jobs.ParquetReaderJob$1.process(ParquetReaderJob.java:22)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at
> org.apache.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:56)
> at org.apache.crunch.MapFn.process(MapFn.java:34)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:99)
> at org.apache.crunch.impl.mr.run.RTNode.process(RTNode.java:110)
> at org.apache.crunch.impl.mr.run.CrunchMapper.map(CrunchMapper.java:60)
> at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
> at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:672)
> at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
> at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
>