You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Jeff Quinn <je...@nuna.com> on 2015/07/28 02:11:30 UTC

Non Deterministic Record Drops

Hello,

We have observed and replicated strange behavior with our crunch
application while running on MapReduce via the AWS ElasticMapReduce
service. Running a very simple job which is mostly map only, we see that an
undetermined subset of records are getting dropped. Specifically, we
expect 30,136,686 output records and have seen output on different trials
(running over the same data with the same binary):

22,177,119 records
26,435,670 records
22,362,986 records
29,798,528 records

These are all the things about our application which might be unusual and
relevant:

- We use a custom file input format, via From.formattedFile. It looks like
this (basically a carbon copy
of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.IOException;

public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException,
      InterruptedException {
    return new LineRecordReader();
  }
}

- We call org.apache.crunch.Pipeline#read using this InputFormat many
times, for the job in question it is called ~160 times as the input is
~100 different files. Each file ranges in size from 100MB-8GB. Our job
only uses this input format for all input files.

- For some files org.apache.crunch.Pipeline#read is called twice one
the same file, and the resulting PTables are processed in different
ways.

- It is only the data from these files which
org.apache.crunch.Pipeline#read has been called on more than once
during a job that have dropped records, all other files consistently
do not have dropped records

Curious if any Crunch users have experienced similar behavior before,
or if any of these details about my job raise any red flags.

Thanks!

Jeff Quinn

Data Engineer

Nuna

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Jeff Quinn <je...@nuna.com>.
Hi David,

Thanks for taking a look. We have no reason to do multiple reads, multiple
operations on the same PTable should be fine for us. The structure of our
code just made it a bit simpler with multiple reads. Do you think it could
be fundamentally bad or just bad for performance?

Thanks,

Jeff

On Monday, July 27, 2015, David Ortiz <dp...@gmail.com> wrote:

> Out of curiosity, any reason you went with multiple reads as opposed to
> just performing multiple operations on the same PTable? parallelDo returns
> a new object rather than modifying the initial one, so a single collection
> can start multiple execution flows.
>
> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <jeff@nuna.com
> <javascript:_e(%7B%7D,'cvml','jeff@nuna.com');>> wrote:
>
>> Hello,
>>
>> We have observed and replicated strange behavior with our crunch
>> application while running on MapReduce via the AWS ElasticMapReduce
>> service. Running a very simple job which is mostly map only, we see that an
>> undetermined subset of records are getting dropped. Specifically, we
>> expect 30,136,686 output records and have seen output on different trials
>> (running over the same data with the same binary):
>>
>> 22,177,119 records
>> 26,435,670 records
>> 22,362,986 records
>> 29,798,528 records
>>
>> These are all the things about our application which might be unusual and
>> relevant:
>>
>> - We use a custom file input format, via From.formattedFile. It looks
>> like this (basically a carbon copy
>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapreduce.InputSplit;
>> import org.apache.hadoop.mapreduce.RecordReader;
>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>
>> import java.io.IOException;
>>
>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>
>>   @Override
>>   public RecordReader<LongWritable, Text> createRecordReader(
>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>       InterruptedException {
>>     return new LineRecordReader();
>>   }
>> }
>>
>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>
>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>
>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>
>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>
>> Thanks!
>>
>> Jeff Quinn
>>
>> Data Engineer
>>
>> Nuna
>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Everett Anderson <ev...@nuna.com>.
Great explanation! Thanks very much!

On Tue, Jul 28, 2015 at 8:44 PM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Everett,
>
> The bug was specific to the situation where the input was a PTable (never
> a PCollection) that was being processed multiple times in a pure map-only
> job. The reason is that internal to the MR planner, an input PTable is
> really just a thin wrapper around an input PCollection<Pair<K, V>> (at
> least from a job configuration perspective.) The planner keeps track of
> which input PCollections it has seen before as it walks the DAG with a
> Visitor object that knows how to update an internal graph based on the type
> of PCollection visited (input, DoFn, union, GBK, etc.)
>
> Before I added the fix to the BaseInputTable to distinguish it (in an
> equals(Object) sense) from a BaseInputCollection, it was possible for the
> planner to get confused and assign multiple Vertices in the DAG to the same
> underlying input (one for the PCollection version, and one for the PTable
> version). Some of the outputs would go to the PCollection, some would go to
> the PTable, and unless there was a GBK operation that had both "versions"
> as parents, it was possible for the planner to essentially lose either the
> PTable or the PCollection vertex when it went to finish the job, which
> meant that none of those inputs would get read. The order in which the DAG
> is walked isn't deterministic for outputs that are on the same "level"
> (i.e., all of the outputs from a map-only job), so the inputs that would
> get processed in your jobs would change from run to run depending on the
> order in which they showed up in the graph, as you saw.
>
> The change I made ensures that all of the inputs are tracked to the same
> Vertex in the graph (the one based on the underlying InputCollection that
> is wrapped by the InputPTable) by the planner, so now no inputs get lost. I
> hope that helps a little bit.
>
> J
>
>
> On Tue, Jul 28, 2015 at 10:36 AM, David Ortiz <do...@videologygroup.com>
> wrote:
>
>>  For what it's worth, the optimizer may still read the file more than
>> once even if there's only one read in your code.  All depends on what else
>> is being done.
>>
>>  *Sent from my Verizon Wireless 4G LTE DROID*
>>  On Jul 28, 2015 1:34 PM, Everett Anderson <ev...@nuna.com> wrote:
>>  Thanks, Josh!!
>>
>>  I'm curious about the fix and didn't fully understand from the
>> description.
>>
>>  What's interesting about the test is that there's only one Pipeline
>> read(), but then multiple parallelDo()s on the resulting table, yet you
>> still hit the issue. We'd thought it must be due to the multiple reads of
>> the same file.
>>
>>  Would this have happened in other places where multiple operations were
>> performed on the same PTable or PCollection, or is it specific to the
>> operations performed on objects created directly from a read()?
>>
>>
>>
>> On Mon, Jul 27, 2015 at 6:49 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> That was a deeply satisfying bug. Fix is up here:
>>> https://issues.apache.org/jira/browse/CRUNCH-553
>>>
>>> On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <je...@nuna.com> wrote:
>>>
>>>> Wow, thanks so much for looking into it. That minimal example
>>>> seems accurate. Previously when we dug deeper into which records were
>>>> dropped it appeared entire files were being dropped, not just parts of one
>>>> file, so that sounds consistent with what you are seeing.
>>>>
>>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>>
>>>>>  Hey Jeff,
>>>>>
>>>>>  Okay cool-- I think I've managed to create a simple test that
>>>>> replicates the behavior you're seeing. I can run this test a few different
>>>>> times, and sometimes I'll get the correct output, but other times I'll get
>>>>> an error b/c no records are processed. I'm going to investigate further and
>>>>> see if I can identify the source of the randomness.
>>>>>
>>>>>  public class RecordDropIT {
>>>>>   @Rule
>>>>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>>>>
>>>>>   @Test
>>>>>   public void testMultiReadCount() throws Exception {
>>>>>     int numReads = 2;
>>>>>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>>>>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>>>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
>>>>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>>>>     for (int i = 0; i < numReads; i++) {
>>>>>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>>>>>       values.add(cnt.materialize());
>>>>>     }
>>>>>     for (Iterable<Integer> iter : values) {
>>>>>       System.out.println(Iterables.getOnlyElement(iter));
>>>>>     }
>>>>>     p.done();
>>>>>   }
>>>>>
>>>>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>>>>
>>>>>     private int count = 0;
>>>>>
>>>>>     @Override
>>>>>     public void process(T input, Emitter<Integer> emitter) {
>>>>>       count++;
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void cleanup(Emitter<Integer> emitter) {
>>>>>       emitter.emit(count);
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:
>>>>>
>>>>>> Hi Josh,
>>>>>>
>>>>>>  Thanks so much for your suggestions.
>>>>>>
>>>>>>  The counts are determined with two methods, I am using a simple pig
>>>>>> script to count records, and I am also tabulating up the size in bytes of
>>>>>> all hdfs output files. Both measures show dropped records / fewer than
>>>>>> expected output bytes.
>>>>>>
>>>>>>  To your second point I will go back and do a sweep for that, but I
>>>>>> am fairly sure no DoFns are making use of intermediate state values without
>>>>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>>>>> think it has bitten us before.
>>>>>>
>>>>>>  Thanks !
>>>>>>
>>>>>>  Jeff
>>>>>>
>>>>>>
>>>>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>>>>
>>>>>>>  One more thought-- are any of these DoFns keeping records around
>>>>>>> as intermediate state values w/o using PType.getDetachedValue to make
>>>>>>> copies of them?
>>>>>>>
>>>>>>>  J
>>>>>>>
>>>>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hey Jeff,
>>>>>>>>
>>>>>>>>  Are the counts determined by Counters? Or is it the length of the
>>>>>>>> output files? Or both?
>>>>>>>>
>>>>>>>>  J
>>>>>>>>
>>>>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>  Out of curiosity, any reason you went with multiple reads as
>>>>>>>>> opposed to just performing multiple operations on the same PTable?
>>>>>>>>> parallelDo returns a new object rather than modifying the initial one, so a
>>>>>>>>> single collection can start multiple execution flows.
>>>>>>>>>
>>>>>>>>>  On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>>
>>>>>>>>>>  We have observed and replicated strange behavior with our
>>>>>>>>>> crunch application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>>>>>> (running over the same data with the same binary):
>>>>>>>>>>
>>>>>>>>>> 22,177,119 records
>>>>>>>>>> 26,435,670 records
>>>>>>>>>> 22,362,986 records
>>>>>>>>>> 29,798,528 records
>>>>>>>>>>
>>>>>>>>>>  These are all the things about our application which might be
>>>>>>>>>> unusual and relevant:
>>>>>>>>>>
>>>>>>>>>>  - We use a custom file input format, via From.formattedFile. It
>>>>>>>>>> looks like this (basically a carbon copy
>>>>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>>>>
>>>>>>>>>>  import org.apache.hadoop.io.LongWritable;
>>>>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>>>>
>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>
>>>>>>>>>>  public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>>>>>>
>>>>>>>>>>   @Override
>>>>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>>>>       InterruptedException {
>>>>>>>>>>     return new LineRecordReader();
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>>>>>>
>>>>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>>>>>>
>>>>>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>>>>>>
>>>>>>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> Jeff Quinn
>>>>>>>>>>
>>>>>>>>>> Data Engineer
>>>>>>>>>>
>>>>>>>>>> Nuna
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>>>>> attachments, may contain information that is confidential, proprietary in
>>>>>>>>>> nature, protected health information (PHI), or otherwise protected by law
>>>>>>>>>> from disclosure, and is solely for the use of the intended recipient(s). If
>>>>>>>>>> you are not the intended recipient, you are hereby notified that any use,
>>>>>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>   --
>>>>>>>>  Director of Data Science
>>>>>>>> Cloudera <http://www.cloudera.com>
>>>>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  --
>>>>>>>  Director of Data Science
>>>>>>> Cloudera <http://www.cloudera.com>
>>>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>>>
>>>>>>
>>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>>> may contain information that is confidential, proprietary in nature,
>>>>>> protected health information (PHI), or otherwise protected by law from
>>>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>>  --
>>>>>  Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>
>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>> may contain information that is confidential, proprietary in nature,
>>>> protected health information (PHI), or otherwise protected by law from
>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>> are not the intended recipient, you are hereby notified that any use,
>>>> disclosure or copying of this email, including any attachments, is
>>>> unauthorized and strictly prohibited. If you have received this email in
>>>> error, please notify the sender of this email. Please delete this and all
>>>> copies of this email from your system. Any opinions either expressed or
>>>> implied in this email and all attachments, are those of its author only,
>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>
>>>
>>>
>>>
>>>  --
>>>  Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>> *This email is intended only for the use of the individual(s) to whom it
>> is addressed. If you have received this communication in error, please
>> immediately notify the sender and delete the original email.*
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Josh Wills <jw...@cloudera.com>.
Hey Everett,

The bug was specific to the situation where the input was a PTable (never a
PCollection) that was being processed multiple times in a pure map-only
job. The reason is that internal to the MR planner, an input PTable is
really just a thin wrapper around an input PCollection<Pair<K, V>> (at
least from a job configuration perspective.) The planner keeps track of
which input PCollections it has seen before as it walks the DAG with a
Visitor object that knows how to update an internal graph based on the type
of PCollection visited (input, DoFn, union, GBK, etc.)

Before I added the fix to the BaseInputTable to distinguish it (in an
equals(Object) sense) from a BaseInputCollection, it was possible for the
planner to get confused and assign multiple Vertices in the DAG to the same
underlying input (one for the PCollection version, and one for the PTable
version). Some of the outputs would go to the PCollection, some would go to
the PTable, and unless there was a GBK operation that had both "versions"
as parents, it was possible for the planner to essentially lose either the
PTable or the PCollection vertex when it went to finish the job, which
meant that none of those inputs would get read. The order in which the DAG
is walked isn't deterministic for outputs that are on the same "level"
(i.e., all of the outputs from a map-only job), so the inputs that would
get processed in your jobs would change from run to run depending on the
order in which they showed up in the graph, as you saw.

The change I made ensures that all of the inputs are tracked to the same
Vertex in the graph (the one based on the underlying InputCollection that
is wrapped by the InputPTable) by the planner, so now no inputs get lost. I
hope that helps a little bit.

J


On Tue, Jul 28, 2015 at 10:36 AM, David Ortiz <do...@videologygroup.com>
wrote:

>  For what it's worth, the optimizer may still read the file more than
> once even if there's only one read in your code.  All depends on what else
> is being done.
>
>  *Sent from my Verizon Wireless 4G LTE DROID*
>  On Jul 28, 2015 1:34 PM, Everett Anderson <ev...@nuna.com> wrote:
>  Thanks, Josh!!
>
>  I'm curious about the fix and didn't fully understand from the
> description.
>
>  What's interesting about the test is that there's only one Pipeline
> read(), but then multiple parallelDo()s on the resulting table, yet you
> still hit the issue. We'd thought it must be due to the multiple reads of
> the same file.
>
>  Would this have happened in other places where multiple operations were
> performed on the same PTable or PCollection, or is it specific to the
> operations performed on objects created directly from a read()?
>
>
>
> On Mon, Jul 27, 2015 at 6:49 PM, Josh Wills <jw...@cloudera.com> wrote:
>
>> That was a deeply satisfying bug. Fix is up here:
>> https://issues.apache.org/jira/browse/CRUNCH-553
>>
>> On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <je...@nuna.com> wrote:
>>
>>> Wow, thanks so much for looking into it. That minimal example
>>> seems accurate. Previously when we dug deeper into which records were
>>> dropped it appeared entire files were being dropped, not just parts of one
>>> file, so that sounds consistent with what you are seeing.
>>>
>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>>  Hey Jeff,
>>>>
>>>>  Okay cool-- I think I've managed to create a simple test that
>>>> replicates the behavior you're seeing. I can run this test a few different
>>>> times, and sometimes I'll get the correct output, but other times I'll get
>>>> an error b/c no records are processed. I'm going to investigate further and
>>>> see if I can identify the source of the randomness.
>>>>
>>>>  public class RecordDropIT {
>>>>   @Rule
>>>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>>>
>>>>   @Test
>>>>   public void testMultiReadCount() throws Exception {
>>>>     int numReads = 2;
>>>>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>>>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
>>>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>>>     for (int i = 0; i < numReads; i++) {
>>>>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>>>>       values.add(cnt.materialize());
>>>>     }
>>>>     for (Iterable<Integer> iter : values) {
>>>>       System.out.println(Iterables.getOnlyElement(iter));
>>>>     }
>>>>     p.done();
>>>>   }
>>>>
>>>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>>>
>>>>     private int count = 0;
>>>>
>>>>     @Override
>>>>     public void process(T input, Emitter<Integer> emitter) {
>>>>       count++;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void cleanup(Emitter<Integer> emitter) {
>>>>       emitter.emit(count);
>>>>     }
>>>>   }
>>>> }
>>>>
>>>>
>>>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>>  Thanks so much for your suggestions.
>>>>>
>>>>>  The counts are determined with two methods, I am using a simple pig
>>>>> script to count records, and I am also tabulating up the size in bytes of
>>>>> all hdfs output files. Both measures show dropped records / fewer than
>>>>> expected output bytes.
>>>>>
>>>>>  To your second point I will go back and do a sweep for that, but I
>>>>> am fairly sure no DoFns are making use of intermediate state values without
>>>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>>>> think it has bitten us before.
>>>>>
>>>>>  Thanks !
>>>>>
>>>>>  Jeff
>>>>>
>>>>>
>>>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>>>
>>>>>>  One more thought-- are any of these DoFns keeping records around as
>>>>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>>>>> of them?
>>>>>>
>>>>>>  J
>>>>>>
>>>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hey Jeff,
>>>>>>>
>>>>>>>  Are the counts determined by Counters? Or is it the length of the
>>>>>>> output files? Or both?
>>>>>>>
>>>>>>>  J
>>>>>>>
>>>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Out of curiosity, any reason you went with multiple reads as
>>>>>>>> opposed to just performing multiple operations on the same PTable?
>>>>>>>> parallelDo returns a new object rather than modifying the initial one, so a
>>>>>>>> single collection can start multiple execution flows.
>>>>>>>>
>>>>>>>>  On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>>
>>>>>>>>>  We have observed and replicated strange behavior with our crunch
>>>>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>>>>> (running over the same data with the same binary):
>>>>>>>>>
>>>>>>>>> 22,177,119 records
>>>>>>>>> 26,435,670 records
>>>>>>>>> 22,362,986 records
>>>>>>>>> 29,798,528 records
>>>>>>>>>
>>>>>>>>>  These are all the things about our application which might be
>>>>>>>>> unusual and relevant:
>>>>>>>>>
>>>>>>>>>  - We use a custom file input format, via From.formattedFile. It
>>>>>>>>> looks like this (basically a carbon copy
>>>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>>>
>>>>>>>>>  import org.apache.hadoop.io.LongWritable;
>>>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>>>
>>>>>>>>> import java.io.IOException;
>>>>>>>>>
>>>>>>>>>  public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>>>>>
>>>>>>>>>   @Override
>>>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>>>       InterruptedException {
>>>>>>>>>     return new LineRecordReader();
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>>>>>
>>>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>>>>>
>>>>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>>>>>
>>>>>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> Jeff Quinn
>>>>>>>>>
>>>>>>>>> Data Engineer
>>>>>>>>>
>>>>>>>>> Nuna
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>>>> attachments, may contain information that is confidential, proprietary in
>>>>>>>>> nature, protected health information (PHI), or otherwise protected by law
>>>>>>>>> from disclosure, and is solely for the use of the intended recipient(s). If
>>>>>>>>> you are not the intended recipient, you are hereby notified that any use,
>>>>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>   --
>>>>>>>  Director of Data Science
>>>>>>> Cloudera <http://www.cloudera.com>
>>>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>  --
>>>>>>  Director of Data Science
>>>>>> Cloudera <http://www.cloudera.com>
>>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>>
>>>>>
>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>> may contain information that is confidential, proprietary in nature,
>>>>> protected health information (PHI), or otherwise protected by law from
>>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>> disclosure or copying of this email, including any attachments, is
>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>> error, please notify the sender of this email. Please delete this and all
>>>>> copies of this email from your system. Any opinions either expressed or
>>>>> implied in this email and all attachments, are those of its author only,
>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>
>>>>
>>>>
>>>>
>>>>  --
>>>>  Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>
>>
>>
>>
>>  --
>>  Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
> *This email is intended only for the use of the individual(s) to whom it
> is addressed. If you have received this communication in error, please
> immediately notify the sender and delete the original email.*
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Non Deterministic Record Drops

Posted by David Ortiz <do...@videologygroup.com>.
For what it's worth, the optimizer may still read the file more than once even if there's only one read in your code.  All depends on what else is being done.

Sent from my Verizon Wireless 4G LTE DROID
On Jul 28, 2015 1:34 PM, Everett Anderson <ev...@nuna.com> wrote:
Thanks, Josh!!

I'm curious about the fix and didn't fully understand from the description.

What's interesting about the test is that there's only one Pipeline read(), but then multiple parallelDo()s on the resulting table, yet you still hit the issue. We'd thought it must be due to the multiple reads of the same file.

Would this have happened in other places where multiple operations were performed on the same PTable or PCollection, or is it specific to the operations performed on objects created directly from a read()?



On Mon, Jul 27, 2015 at 6:49 PM, Josh Wills <jw...@cloudera.com>> wrote:
That was a deeply satisfying bug. Fix is up here: https://issues.apache.org/jira/browse/CRUNCH-553

On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <je...@nuna.com>> wrote:
Wow, thanks so much for looking into it. That minimal example seems accurate. Previously when we dug deeper into which records were dropped it appeared entire files were being dropped, not just parts of one file, so that sounds consistent with what you are seeing.

On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com>> wrote:
Hey Jeff,

Okay cool-- I think I've managed to create a simple test that replicates the behavior you're seeing. I can run this test a few different times, and sometimes I'll get the correct output, but other times I'll get an error b/c no records are processed. I'm going to investigate further and see if I can identify the source of the randomness.


public class RecordDropIT {
  @Rule
  public TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testMultiReadCount() throws Exception {
    int numReads = 2;
    MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
    Path shakes = tmpDir.copyResourcePath("shakes.txt");
    TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
    List<Iterable<Integer>> values = Lists.newArrayList();
    for (int i = 0; i < numReads; i++) {
      PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
      values.add(cnt.materialize());
    }
    for (Iterable<Integer> iter : values) {
      System.out.println(Iterables.getOnlyElement(iter));
    }
    p.done();
  }

  public static class LineCountFn<T> extends DoFn<T, Integer> {

    private int count = 0;

    @Override
    public void process(T input, Emitter<Integer> emitter) {
      count++;
    }

    @Override
    public void cleanup(Emitter<Integer> emitter) {
      emitter.emit(count);
    }
  }
}

On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:
Hi Josh,

Thanks so much for your suggestions.

The counts are determined with two methods, I am using a simple pig script to count records, and I am also tabulating up the size in bytes of all hdfs output files. Both measures show dropped records / fewer than expected output bytes.

To your second point I will go back and do a sweep for that, but I am fairly sure no DoFns are making use of intermediate state values without getDetachedValue. Our team is aware of the getDetachedValue gotchas as I think it has bitten us before.

Thanks !

Jeff


On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
One more thought-- are any of these DoFns keeping records around as intermediate state values w/o using PType.getDetachedValue to make copies of them?

J

On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com> wrote:
Hey Jeff,

Are the counts determined by Counters? Or is it the length of the output files? Or both?

J

On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com> wrote:

Out of curiosity, any reason you went with multiple reads as opposed to just performing multiple operations on the same PTable? parallelDo returns a new object rather than modifying the initial one, so a single collection can start multiple execution flows.

On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
Hello,

We have observed and replicated strange behavior with our crunch application while running on MapReduce via the AWS ElasticMapReduce service. Running a very simple job which is mostly map only, we see that an undetermined subset of records are getting dropped. Specifically, we expect 30,136,686 output records and have seen output on different trials (running over the same data with the same binary):

22,177,119 records
26,435,670 records
22,362,986 records
29,798,528 records

These are all the things about our application which might be unusual and relevant:

- We use a custom file input format, via From.formattedFile. It looks like this (basically a carbon copy of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

import java.io.IOException;

public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text> createRecordReader(
      InputSplit split, TaskAttemptContext context) throws IOException,
      InterruptedException {
    return new LineRecordReader();
  }
}

- We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.

- For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.

- It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records

Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.

Thanks!

Jeff Quinn

Data Engineer

Nuna

DISCLAIMER: The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>

DISCLAIMER: The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>

DISCLAIMER: The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc.



--
Director of Data Science
Cloudera<http://www.cloudera.com>
Twitter: @josh_wills<http://twitter.com/josh_wills>


DISCLAIMER: The contents of this email, including any attachments, may contain information that is confidential, proprietary in nature, protected health information (PHI), or otherwise protected by law from disclosure, and is solely for the use of the intended recipient(s). If you are not the intended recipient, you are hereby notified that any use, disclosure or copying of this email, including any attachments, is unauthorized and strictly prohibited. If you have received this email in error, please notify the sender of this email. Please delete this and all copies of this email from your system. Any opinions either expressed or implied in this email and all attachments, are those of its author only, and do not necessarily reflect those of Nuna Health, Inc.
This email is intended only for the use of the individual(s) to whom it is addressed. If you have received this communication in error, please immediately notify the sender and delete the original email.

Re: Non Deterministic Record Drops

Posted by Everett Anderson <ev...@nuna.com>.
Thanks, Josh!!

I'm curious about the fix and didn't fully understand from the description.

What's interesting about the test is that there's only one Pipeline read(),
but then multiple parallelDo()s on the resulting table, yet you still hit
the issue. We'd thought it must be due to the multiple reads of the same
file.

Would this have happened in other places where multiple operations were
performed on the same PTable or PCollection, or is it specific to the
operations performed on objects created directly from a read()?



On Mon, Jul 27, 2015 at 6:49 PM, Josh Wills <jw...@cloudera.com> wrote:

> That was a deeply satisfying bug. Fix is up here:
> https://issues.apache.org/jira/browse/CRUNCH-553
>
> On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <je...@nuna.com> wrote:
>
>> Wow, thanks so much for looking into it. That minimal example
>> seems accurate. Previously when we dug deeper into which records were
>> dropped it appeared entire files were being dropped, not just parts of one
>> file, so that sounds consistent with what you are seeing.
>>
>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> Hey Jeff,
>>>
>>> Okay cool-- I think I've managed to create a simple test that replicates
>>> the behavior you're seeing. I can run this test a few different times, and
>>> sometimes I'll get the correct output, but other times I'll get an error
>>> b/c no records are processed. I'm going to investigate further and see if I
>>> can identify the source of the randomness.
>>>
>>> public class RecordDropIT {
>>>   @Rule
>>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>>
>>>   @Test
>>>   public void testMultiReadCount() throws Exception {
>>>     int numReads = 2;
>>>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
>>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>>     for (int i = 0; i < numReads; i++) {
>>>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>>>       values.add(cnt.materialize());
>>>     }
>>>     for (Iterable<Integer> iter : values) {
>>>       System.out.println(Iterables.getOnlyElement(iter));
>>>     }
>>>     p.done();
>>>   }
>>>
>>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>>
>>>     private int count = 0;
>>>
>>>     @Override
>>>     public void process(T input, Emitter<Integer> emitter) {
>>>       count++;
>>>     }
>>>
>>>     @Override
>>>     public void cleanup(Emitter<Integer> emitter) {
>>>       emitter.emit(count);
>>>     }
>>>   }
>>> }
>>>
>>>
>>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> Thanks so much for your suggestions.
>>>>
>>>> The counts are determined with two methods, I am using a simple pig
>>>> script to count records, and I am also tabulating up the size in bytes of
>>>> all hdfs output files. Both measures show dropped records / fewer than
>>>> expected output bytes.
>>>>
>>>> To your second point I will go back and do a sweep for that, but I am
>>>> fairly sure no DoFns are making use of intermediate state values without
>>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>>> think it has bitten us before.
>>>>
>>>> Thanks !
>>>>
>>>> Jeff
>>>>
>>>>
>>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>>
>>>>> One more thought-- are any of these DoFns keeping records around as
>>>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>>>> of them?
>>>>>
>>>>> J
>>>>>
>>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> Hey Jeff,
>>>>>>
>>>>>> Are the counts determined by Counters? Or is it the length of the
>>>>>> output files? Or both?
>>>>>>
>>>>>> J
>>>>>>
>>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>>>> returns a new object rather than modifying the initial one, so a single
>>>>>>> collection can start multiple execution flows.
>>>>>>>
>>>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>>>> (running over the same data with the same binary):
>>>>>>>>
>>>>>>>> 22,177,119 records
>>>>>>>> 26,435,670 records
>>>>>>>> 22,362,986 records
>>>>>>>> 29,798,528 records
>>>>>>>>
>>>>>>>> These are all the things about our application which might be
>>>>>>>> unusual and relevant:
>>>>>>>>
>>>>>>>> - We use a custom file input format, via From.formattedFile. It
>>>>>>>> looks like this (basically a carbon copy
>>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>>
>>>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>>
>>>>>>>> import java.io.IOException;
>>>>>>>>
>>>>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>>>>
>>>>>>>>   @Override
>>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>>       InterruptedException {
>>>>>>>>     return new LineRecordReader();
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>>>>
>>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>>>>
>>>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>>>>
>>>>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> Jeff Quinn
>>>>>>>>
>>>>>>>> Data Engineer
>>>>>>>>
>>>>>>>> Nuna
>>>>>>>>
>>>>>>>>
>>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>>> attachments, may contain information that is confidential, proprietary in
>>>>>>>> nature, protected health information (PHI), or otherwise protected by law
>>>>>>>> from disclosure, and is solely for the use of the intended recipient(s). If
>>>>>>>> you are not the intended recipient, you are hereby notified that any use,
>>>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Director of Data Science
>>>>>> Cloudera <http://www.cloudera.com>
>>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>
>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>> may contain information that is confidential, proprietary in nature,
>>>> protected health information (PHI), or otherwise protected by law from
>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>> are not the intended recipient, you are hereby notified that any use,
>>>> disclosure or copying of this email, including any attachments, is
>>>> unauthorized and strictly prohibited. If you have received this email in
>>>> error, please notify the sender of this email. Please delete this and all
>>>> copies of this email from your system. Any opinions either expressed or
>>>> implied in this email and all attachments, are those of its author only,
>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Josh Wills <jw...@cloudera.com>.
That was a deeply satisfying bug. Fix is up here:
https://issues.apache.org/jira/browse/CRUNCH-553

On Mon, Jul 27, 2015 at 6:29 PM, Jeff Quinn <je...@nuna.com> wrote:

> Wow, thanks so much for looking into it. That minimal example
> seems accurate. Previously when we dug deeper into which records were
> dropped it appeared entire files were being dropped, not just parts of one
> file, so that sounds consistent with what you are seeing.
>
> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>
>> Hey Jeff,
>>
>> Okay cool-- I think I've managed to create a simple test that replicates
>> the behavior you're seeing. I can run this test a few different times, and
>> sometimes I'll get the correct output, but other times I'll get an error
>> b/c no records are processed. I'm going to investigate further and see if I
>> can identify the source of the randomness.
>>
>> public class RecordDropIT {
>>   @Rule
>>   public TemporaryPath tmpDir = TemporaryPaths.create();
>>
>>   @Test
>>   public void testMultiReadCount() throws Exception {
>>     int numReads = 2;
>>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
>>     List<Iterable<Integer>> values = Lists.newArrayList();
>>     for (int i = 0; i < numReads; i++) {
>>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>>       values.add(cnt.materialize());
>>     }
>>     for (Iterable<Integer> iter : values) {
>>       System.out.println(Iterables.getOnlyElement(iter));
>>     }
>>     p.done();
>>   }
>>
>>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>>
>>     private int count = 0;
>>
>>     @Override
>>     public void process(T input, Emitter<Integer> emitter) {
>>       count++;
>>     }
>>
>>     @Override
>>     public void cleanup(Emitter<Integer> emitter) {
>>       emitter.emit(count);
>>     }
>>   }
>> }
>>
>>
>> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:
>>
>>> Hi Josh,
>>>
>>> Thanks so much for your suggestions.
>>>
>>> The counts are determined with two methods, I am using a simple pig
>>> script to count records, and I am also tabulating up the size in bytes of
>>> all hdfs output files. Both measures show dropped records / fewer than
>>> expected output bytes.
>>>
>>> To your second point I will go back and do a sweep for that, but I am
>>> fairly sure no DoFns are making use of intermediate state values without
>>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>>> think it has bitten us before.
>>>
>>> Thanks !
>>>
>>> Jeff
>>>
>>>
>>> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>> One more thought-- are any of these DoFns keeping records around as
>>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>>> of them?
>>>>
>>>> J
>>>>
>>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com>
>>>> wrote:
>>>>
>>>>> Hey Jeff,
>>>>>
>>>>> Are the counts determined by Counters? Or is it the length of the
>>>>> output files? Or both?
>>>>>
>>>>> J
>>>>>
>>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>>> returns a new object rather than modifying the initial one, so a single
>>>>>> collection can start multiple execution flows.
>>>>>>
>>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>>> (running over the same data with the same binary):
>>>>>>>
>>>>>>> 22,177,119 records
>>>>>>> 26,435,670 records
>>>>>>> 22,362,986 records
>>>>>>> 29,798,528 records
>>>>>>>
>>>>>>> These are all the things about our application which might be
>>>>>>> unusual and relevant:
>>>>>>>
>>>>>>> - We use a custom file input format, via From.formattedFile. It
>>>>>>> looks like this (basically a carbon copy
>>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>>
>>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>>> import org.apache.hadoop.io.Text;
>>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>>
>>>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>>>
>>>>>>>   @Override
>>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>>       InterruptedException {
>>>>>>>     return new LineRecordReader();
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>>>
>>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>>>
>>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>>>
>>>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>> Jeff Quinn
>>>>>>>
>>>>>>> Data Engineer
>>>>>>>
>>>>>>> Nuna
>>>>>>>
>>>>>>>
>>>>>>> *DISCLAIMER:* The contents of this email, including any
>>>>>>> attachments, may contain information that is confidential, proprietary in
>>>>>>> nature, protected health information (PHI), or otherwise protected by law
>>>>>>> from disclosure, and is solely for the use of the intended recipient(s). If
>>>>>>> you are not the intended recipient, you are hereby notified that any use,
>>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Director of Data Science
>>>>> Cloudera <http://www.cloudera.com>
>>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Non Deterministic Record Drops

Posted by Jeff Quinn <je...@nuna.com>.
Wow, thanks so much for looking into it. That minimal example
seems accurate. Previously when we dug deeper into which records were
dropped it appeared entire files were being dropped, not just parts of one
file, so that sounds consistent with what you are seeing.

On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:

> Hey Jeff,
>
> Okay cool-- I think I've managed to create a simple test that replicates
> the behavior you're seeing. I can run this test a few different times, and
> sometimes I'll get the correct output, but other times I'll get an error
> b/c no records are processed. I'm going to investigate further and see if I
> can identify the source of the randomness.
>
> public class RecordDropIT {
>   @Rule
>   public TemporaryPath tmpDir = TemporaryPaths.create();
>
>   @Test
>   public void testMultiReadCount() throws Exception {
>     int numReads = 2;
>     MRPipeline p = new MRPipeline(RecordDropIT.class, tmpDir.getDefaultConfiguration());
>     Path shakes = tmpDir.copyResourcePath("shakes.txt");
>     TableSource<LongWritable, Text> src = From.formattedFile(shakes, TextInputFormat.class, LongWritable.class, Text.class);
>     List<Iterable<Integer>> values = Lists.newArrayList();
>     for (int i = 0; i < numReads; i++) {
>       PCollection<Integer> cnt = p.read(src).parallelDo(new LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
>       values.add(cnt.materialize());
>     }
>     for (Iterable<Integer> iter : values) {
>       System.out.println(Iterables.getOnlyElement(iter));
>     }
>     p.done();
>   }
>
>   public static class LineCountFn<T> extends DoFn<T, Integer> {
>
>     private int count = 0;
>
>     @Override
>     public void process(T input, Emitter<Integer> emitter) {
>       count++;
>     }
>
>     @Override
>     public void cleanup(Emitter<Integer> emitter) {
>       emitter.emit(count);
>     }
>   }
> }
>
>
> On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <jeff@nuna.com
> <javascript:_e(%7B%7D,'cvml','jeff@nuna.com');>> wrote:
>
>> Hi Josh,
>>
>> Thanks so much for your suggestions.
>>
>> The counts are determined with two methods, I am using a simple pig
>> script to count records, and I am also tabulating up the size in bytes of
>> all hdfs output files. Both measures show dropped records / fewer than
>> expected output bytes.
>>
>> To your second point I will go back and do a sweep for that, but I am
>> fairly sure no DoFns are making use of intermediate state values without
>> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
>> think it has bitten us before.
>>
>> Thanks !
>>
>> Jeff
>>
>> On Monday, July 27, 2015, Josh Wills <jwills@cloudera.com
>> <javascript:_e(%7B%7D,'cvml','jwills@cloudera.com');>> wrote:
>>
>>> One more thought-- are any of these DoFns keeping records around as
>>> intermediate state values w/o using PType.getDetachedValue to make copies
>>> of them?
>>>
>>> J
>>>
>>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com> wrote:
>>>
>>>> Hey Jeff,
>>>>
>>>> Are the counts determined by Counters? Or is it the length of the
>>>> output files? Or both?
>>>>
>>>> J
>>>>
>>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com> wrote:
>>>>
>>>>> Out of curiosity, any reason you went with multiple reads as opposed
>>>>> to just performing multiple operations on the same PTable? parallelDo
>>>>> returns a new object rather than modifying the initial one, so a single
>>>>> collection can start multiple execution flows.
>>>>>
>>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> We have observed and replicated strange behavior with our crunch
>>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>>> (running over the same data with the same binary):
>>>>>>
>>>>>> 22,177,119 records
>>>>>> 26,435,670 records
>>>>>> 22,362,986 records
>>>>>> 29,798,528 records
>>>>>>
>>>>>> These are all the things about our application which might be unusual
>>>>>> and relevant:
>>>>>>
>>>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>>>> like this (basically a carbon copy
>>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>>
>>>>>> import org.apache.hadoop.io.LongWritable;
>>>>>> import org.apache.hadoop.io.Text;
>>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>>
>>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>>
>>>>>>   @Override
>>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>>       InterruptedException {
>>>>>>     return new LineRecordReader();
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>>
>>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>>
>>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>>
>>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Jeff Quinn
>>>>>>
>>>>>> Data Engineer
>>>>>>
>>>>>> Nuna
>>>>>>
>>>>>>
>>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>>> may contain information that is confidential, proprietary in nature,
>>>>>> protected health information (PHI), or otherwise protected by law from
>>>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>>> disclosure or copying of this email, including any attachments, is
>>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>>> error, please notify the sender of this email. Please delete this and all
>>>>>> copies of this email from your system. Any opinions either expressed or
>>>>>> implied in this email and all attachments, are those of its author only,
>>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Director of Data Science
>>>> Cloudera <http://www.cloudera.com>
>>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>>
>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Josh Wills <jw...@cloudera.com>.
Hey Jeff,

Okay cool-- I think I've managed to create a simple test that replicates
the behavior you're seeing. I can run this test a few different times, and
sometimes I'll get the correct output, but other times I'll get an error
b/c no records are processed. I'm going to investigate further and see if I
can identify the source of the randomness.

public class RecordDropIT {
  @Rule
  public TemporaryPath tmpDir = TemporaryPaths.create();

  @Test
  public void testMultiReadCount() throws Exception {
    int numReads = 2;
    MRPipeline p = new MRPipeline(RecordDropIT.class,
tmpDir.getDefaultConfiguration());
    Path shakes = tmpDir.copyResourcePath("shakes.txt");
    TableSource<LongWritable, Text> src = From.formattedFile(shakes,
TextInputFormat.class, LongWritable.class, Text.class);
    List<Iterable<Integer>> values = Lists.newArrayList();
    for (int i = 0; i < numReads; i++) {
      PCollection<Integer> cnt = p.read(src).parallelDo(new
LineCountFn<Pair<LongWritable, Text>>(), Writables.ints());
      values.add(cnt.materialize());
    }
    for (Iterable<Integer> iter : values) {
      System.out.println(Iterables.getOnlyElement(iter));
    }
    p.done();
  }

  public static class LineCountFn<T> extends DoFn<T, Integer> {

    private int count = 0;

    @Override
    public void process(T input, Emitter<Integer> emitter) {
      count++;
    }

    @Override
    public void cleanup(Emitter<Integer> emitter) {
      emitter.emit(count);
    }
  }
}


On Mon, Jul 27, 2015 at 6:11 PM, Jeff Quinn <je...@nuna.com> wrote:

> Hi Josh,
>
> Thanks so much for your suggestions.
>
> The counts are determined with two methods, I am using a simple pig script
> to count records, and I am also tabulating up the size in bytes of all hdfs
> output files. Both measures show dropped records / fewer than expected
> output bytes.
>
> To your second point I will go back and do a sweep for that, but I am
> fairly sure no DoFns are making use of intermediate state values without
> getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
> think it has bitten us before.
>
> Thanks !
>
> Jeff
>
> On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:
>
>> One more thought-- are any of these DoFns keeping records around as
>> intermediate state values w/o using PType.getDetachedValue to make copies
>> of them?
>>
>> J
>>
>> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com> wrote:
>>
>>> Hey Jeff,
>>>
>>> Are the counts determined by Counters? Or is it the length of the output
>>> files? Or both?
>>>
>>> J
>>>
>>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com> wrote:
>>>
>>>> Out of curiosity, any reason you went with multiple reads as opposed to
>>>> just performing multiple operations on the same PTable? parallelDo returns
>>>> a new object rather than modifying the initial one, so a single collection
>>>> can start multiple execution flows.
>>>>
>>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> We have observed and replicated strange behavior with our crunch
>>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>>> service. Running a very simple job which is mostly map only, we see that an
>>>>> undetermined subset of records are getting dropped. Specifically, we
>>>>> expect 30,136,686 output records and have seen output on different trials
>>>>> (running over the same data with the same binary):
>>>>>
>>>>> 22,177,119 records
>>>>> 26,435,670 records
>>>>> 22,362,986 records
>>>>> 29,798,528 records
>>>>>
>>>>> These are all the things about our application which might be unusual
>>>>> and relevant:
>>>>>
>>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>>> like this (basically a carbon copy
>>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>>
>>>>> import org.apache.hadoop.io.LongWritable;
>>>>> import org.apache.hadoop.io.Text;
>>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>>
>>>>> import java.io.IOException;
>>>>>
>>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>>
>>>>>   @Override
>>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>>       InterruptedException {
>>>>>     return new LineRecordReader();
>>>>>   }
>>>>> }
>>>>>
>>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>>
>>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>>
>>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>>
>>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>>
>>>>> Thanks!
>>>>>
>>>>> Jeff Quinn
>>>>>
>>>>> Data Engineer
>>>>>
>>>>> Nuna
>>>>>
>>>>>
>>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>>> may contain information that is confidential, proprietary in nature,
>>>>> protected health information (PHI), or otherwise protected by law from
>>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>>> are not the intended recipient, you are hereby notified that any use,
>>>>> disclosure or copying of this email, including any attachments, is
>>>>> unauthorized and strictly prohibited. If you have received this email in
>>>>> error, please notify the sender of this email. Please delete this and all
>>>>> copies of this email from your system. Any opinions either expressed or
>>>>> implied in this email and all attachments, are those of its author only,
>>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>>
>>>>
>>>
>>>
>>> --
>>> Director of Data Science
>>> Cloudera <http://www.cloudera.com>
>>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>>
>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Non Deterministic Record Drops

Posted by Jeff Quinn <je...@nuna.com>.
Hi Josh,

Thanks so much for your suggestions.

The counts are determined with two methods, I am using a simple pig script
to count records, and I am also tabulating up the size in bytes of all hdfs
output files. Both measures show dropped records / fewer than expected
output bytes.

To your second point I will go back and do a sweep for that, but I am
fairly sure no DoFns are making use of intermediate state values without
getDetachedValue. Our team is aware of the getDetachedValue gotchas as I
think it has bitten us before.

Thanks !

Jeff

On Monday, July 27, 2015, Josh Wills <jw...@cloudera.com> wrote:

> One more thought-- are any of these DoFns keeping records around as
> intermediate state values w/o using PType.getDetachedValue to make copies
> of them?
>
> J
>
> On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jwills@cloudera.com
> <javascript:_e(%7B%7D,'cvml','jwills@cloudera.com');>> wrote:
>
>> Hey Jeff,
>>
>> Are the counts determined by Counters? Or is it the length of the output
>> files? Or both?
>>
>> J
>>
>> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dpo5003@gmail.com
>> <javascript:_e(%7B%7D,'cvml','dpo5003@gmail.com');>> wrote:
>>
>>> Out of curiosity, any reason you went with multiple reads as opposed to
>>> just performing multiple operations on the same PTable? parallelDo returns
>>> a new object rather than modifying the initial one, so a single collection
>>> can start multiple execution flows.
>>>
>>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <jeff@nuna.com
>>> <javascript:_e(%7B%7D,'cvml','jeff@nuna.com');>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We have observed and replicated strange behavior with our crunch
>>>> application while running on MapReduce via the AWS ElasticMapReduce
>>>> service. Running a very simple job which is mostly map only, we see that an
>>>> undetermined subset of records are getting dropped. Specifically, we
>>>> expect 30,136,686 output records and have seen output on different trials
>>>> (running over the same data with the same binary):
>>>>
>>>> 22,177,119 records
>>>> 26,435,670 records
>>>> 22,362,986 records
>>>> 29,798,528 records
>>>>
>>>> These are all the things about our application which might be unusual
>>>> and relevant:
>>>>
>>>> - We use a custom file input format, via From.formattedFile. It looks
>>>> like this (basically a carbon copy
>>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>>
>>>> import org.apache.hadoop.io.LongWritable;
>>>> import org.apache.hadoop.io.Text;
>>>> import org.apache.hadoop.mapreduce.InputSplit;
>>>> import org.apache.hadoop.mapreduce.RecordReader;
>>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>>
>>>> import java.io.IOException;
>>>>
>>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>>
>>>>   @Override
>>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>>       InterruptedException {
>>>>     return new LineRecordReader();
>>>>   }
>>>> }
>>>>
>>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>>
>>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>>
>>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>>
>>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>>
>>>> Thanks!
>>>>
>>>> Jeff Quinn
>>>>
>>>> Data Engineer
>>>>
>>>> Nuna
>>>>
>>>>
>>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>>> may contain information that is confidential, proprietary in nature,
>>>> protected health information (PHI), or otherwise protected by law from
>>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>>> are not the intended recipient, you are hereby notified that any use,
>>>> disclosure or copying of this email, including any attachments, is
>>>> unauthorized and strictly prohibited. If you have received this email in
>>>> error, please notify the sender of this email. Please delete this and all
>>>> copies of this email from your system. Any opinions either expressed or
>>>> implied in this email and all attachments, are those of its author only,
>>>> and do not necessarily reflect those of Nuna Health, Inc.
>>>
>>>
>>
>>
>> --
>> Director of Data Science
>> Cloudera <http://www.cloudera.com>
>> Twitter: @josh_wills <http://twitter.com/josh_wills>
>>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

-- 
*DISCLAIMER:* The contents of this email, including any attachments, may 
contain information that is confidential, proprietary in nature, protected 
health information (PHI), or otherwise protected by law from disclosure, 
and is solely for the use of the intended recipient(s). If you are not the 
intended recipient, you are hereby notified that any use, disclosure or 
copying of this email, including any attachments, is unauthorized and 
strictly prohibited. If you have received this email in error, please 
notify the sender of this email. Please delete this and all copies of this 
email from your system. Any opinions either expressed or implied in this 
email and all attachments, are those of its author only, and do not 
necessarily reflect those of Nuna Health, Inc.

Re: Non Deterministic Record Drops

Posted by Josh Wills <jw...@cloudera.com>.
One more thought-- are any of these DoFns keeping records around as
intermediate state values w/o using PType.getDetachedValue to make copies
of them?

J

On Mon, Jul 27, 2015 at 5:47 PM, Josh Wills <jw...@cloudera.com> wrote:

> Hey Jeff,
>
> Are the counts determined by Counters? Or is it the length of the output
> files? Or both?
>
> J
>
> On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com> wrote:
>
>> Out of curiosity, any reason you went with multiple reads as opposed to
>> just performing multiple operations on the same PTable? parallelDo returns
>> a new object rather than modifying the initial one, so a single collection
>> can start multiple execution flows.
>>
>> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>>
>>> Hello,
>>>
>>> We have observed and replicated strange behavior with our crunch
>>> application while running on MapReduce via the AWS ElasticMapReduce
>>> service. Running a very simple job which is mostly map only, we see that an
>>> undetermined subset of records are getting dropped. Specifically, we
>>> expect 30,136,686 output records and have seen output on different trials
>>> (running over the same data with the same binary):
>>>
>>> 22,177,119 records
>>> 26,435,670 records
>>> 22,362,986 records
>>> 29,798,528 records
>>>
>>> These are all the things about our application which might be unusual
>>> and relevant:
>>>
>>> - We use a custom file input format, via From.formattedFile. It looks
>>> like this (basically a carbon copy
>>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>>
>>> import org.apache.hadoop.io.LongWritable;
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.mapreduce.InputSplit;
>>> import org.apache.hadoop.mapreduce.RecordReader;
>>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>>
>>> import java.io.IOException;
>>>
>>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>>
>>>   @Override
>>>   public RecordReader<LongWritable, Text> createRecordReader(
>>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>>       InterruptedException {
>>>     return new LineRecordReader();
>>>   }
>>> }
>>>
>>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>>
>>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>>
>>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>>
>>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>>
>>> Thanks!
>>>
>>> Jeff Quinn
>>>
>>> Data Engineer
>>>
>>> Nuna
>>>
>>>
>>> *DISCLAIMER:* The contents of this email, including any attachments,
>>> may contain information that is confidential, proprietary in nature,
>>> protected health information (PHI), or otherwise protected by law from
>>> disclosure, and is solely for the use of the intended recipient(s). If you
>>> are not the intended recipient, you are hereby notified that any use,
>>> disclosure or copying of this email, including any attachments, is
>>> unauthorized and strictly prohibited. If you have received this email in
>>> error, please notify the sender of this email. Please delete this and all
>>> copies of this email from your system. Any opinions either expressed or
>>> implied in this email and all attachments, are those of its author only,
>>> and do not necessarily reflect those of Nuna Health, Inc.
>>
>>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Non Deterministic Record Drops

Posted by Josh Wills <jw...@cloudera.com>.
Hey Jeff,

Are the counts determined by Counters? Or is it the length of the output
files? Or both?

J

On Mon, Jul 27, 2015 at 5:29 PM, David Ortiz <dp...@gmail.com> wrote:

> Out of curiosity, any reason you went with multiple reads as opposed to
> just performing multiple operations on the same PTable? parallelDo returns
> a new object rather than modifying the initial one, so a single collection
> can start multiple execution flows.
>
> On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:
>
>> Hello,
>>
>> We have observed and replicated strange behavior with our crunch
>> application while running on MapReduce via the AWS ElasticMapReduce
>> service. Running a very simple job which is mostly map only, we see that an
>> undetermined subset of records are getting dropped. Specifically, we
>> expect 30,136,686 output records and have seen output on different trials
>> (running over the same data with the same binary):
>>
>> 22,177,119 records
>> 26,435,670 records
>> 22,362,986 records
>> 29,798,528 records
>>
>> These are all the things about our application which might be unusual and
>> relevant:
>>
>> - We use a custom file input format, via From.formattedFile. It looks
>> like this (basically a carbon copy
>> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>>
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.mapreduce.InputSplit;
>> import org.apache.hadoop.mapreduce.RecordReader;
>> import org.apache.hadoop.mapreduce.TaskAttemptContext;
>> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
>> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>>
>> import java.io.IOException;
>>
>> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>>
>>   @Override
>>   public RecordReader<LongWritable, Text> createRecordReader(
>>       InputSplit split, TaskAttemptContext context) throws IOException,
>>       InterruptedException {
>>     return new LineRecordReader();
>>   }
>> }
>>
>> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>>
>> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>>
>> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>>
>> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>>
>> Thanks!
>>
>> Jeff Quinn
>>
>> Data Engineer
>>
>> Nuna
>>
>>
>> *DISCLAIMER:* The contents of this email, including any attachments, may
>> contain information that is confidential, proprietary in nature, protected
>> health information (PHI), or otherwise protected by law from disclosure,
>> and is solely for the use of the intended recipient(s). If you are not the
>> intended recipient, you are hereby notified that any use, disclosure or
>> copying of this email, including any attachments, is unauthorized and
>> strictly prohibited. If you have received this email in error, please
>> notify the sender of this email. Please delete this and all copies of this
>> email from your system. Any opinions either expressed or implied in this
>> email and all attachments, are those of its author only, and do not
>> necessarily reflect those of Nuna Health, Inc.
>
>


-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

Re: Non Deterministic Record Drops

Posted by David Ortiz <dp...@gmail.com>.
Out of curiosity, any reason you went with multiple reads as opposed to
just performing multiple operations on the same PTable? parallelDo returns
a new object rather than modifying the initial one, so a single collection
can start multiple execution flows.

On Mon, Jul 27, 2015, 8:11 PM Jeff Quinn <je...@nuna.com> wrote:

> Hello,
>
> We have observed and replicated strange behavior with our crunch
> application while running on MapReduce via the AWS ElasticMapReduce
> service. Running a very simple job which is mostly map only, we see that an
> undetermined subset of records are getting dropped. Specifically, we
> expect 30,136,686 output records and have seen output on different trials
> (running over the same data with the same binary):
>
> 22,177,119 records
> 26,435,670 records
> 22,362,986 records
> 29,798,528 records
>
> These are all the things about our application which might be unusual and
> relevant:
>
> - We use a custom file input format, via From.formattedFile. It looks like
> this (basically a carbon copy
> of org.apache.hadoop.mapreduce.lib.input.TextInputFormat):
>
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.InputSplit;
> import org.apache.hadoop.mapreduce.RecordReader;
> import org.apache.hadoop.mapreduce.TaskAttemptContext;
> import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
> import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
>
> import java.io.IOException;
>
> public class ByteOffsetInputFormat extends FileInputFormat<LongWritable, Text> {
>
>   @Override
>   public RecordReader<LongWritable, Text> createRecordReader(
>       InputSplit split, TaskAttemptContext context) throws IOException,
>       InterruptedException {
>     return new LineRecordReader();
>   }
> }
>
> - We call org.apache.crunch.Pipeline#read using this InputFormat many times, for the job in question it is called ~160 times as the input is ~100 different files. Each file ranges in size from 100MB-8GB. Our job only uses this input format for all input files.
>
> - For some files org.apache.crunch.Pipeline#read is called twice one the same file, and the resulting PTables are processed in different ways.
>
> - It is only the data from these files which org.apache.crunch.Pipeline#read has been called on more than once during a job that have dropped records, all other files consistently do not have dropped records
>
> Curious if any Crunch users have experienced similar behavior before, or if any of these details about my job raise any red flags.
>
> Thanks!
>
> Jeff Quinn
>
> Data Engineer
>
> Nuna
>
>
> *DISCLAIMER:* The contents of this email, including any attachments, may
> contain information that is confidential, proprietary in nature, protected
> health information (PHI), or otherwise protected by law from disclosure,
> and is solely for the use of the intended recipient(s). If you are not the
> intended recipient, you are hereby notified that any use, disclosure or
> copying of this email, including any attachments, is unauthorized and
> strictly prohibited. If you have received this email in error, please
> notify the sender of this email. Please delete this and all copies of this
> email from your system. Any opinions either expressed or implied in this
> email and all attachments, are those of its author only, and do not
> necessarily reflect those of Nuna Health, Inc.