You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@crunch.apache.org by Josh Wills <jw...@cloudera.com> on 2014/12/04 23:21:55 UTC

Re: Multiple Reduces in a Single Crunch Job

Danny,

Spent a couple of hours today banging on this by hacking on some
integration tests but couldn't replicate it. However, I just took a closer
look at the plan you posted, and I noticed that all of the files you are
writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that
Crunch is creating; is it possible that Crunch is creating the temp file
locally on your client machine for some reason? I can't think of why that
would happen off the top of my head, but if that is the problem, I'll at
least be able to figure out where to look.

Josh


On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com>
wrote:

> No problem, Happy Thanksgiving!
>
> Gobble Gobble...
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 18:23:14 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Very useful-- thank you. Will dig into it and report back, although I'm
> heading out for the holiday so it likely won't be until early next week.
>
> J
>
> On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
>  Having a single pipeline in the application didn't fix it. Sticking a
> pipeline.run() in the middle also didn't matter either, the plan appears
> such that the planner is completely ignoring the second the run() I added.
>
> However what DOES WORK is if I do:
>
> collection = secondarySort()
> pipeline.cache(collection)
> pipeline.run()
> newcollection = collection.groupByKey()
>
> If I try adding the cache() without calling run() in between it doesn't
> work. Hope that's enough info for you to fix the possible planner bug.
>
> Thanks for the help Josh!
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Reduces in a Single Crunch Job
> Date: Wed, 26 Nov 2014 01:58:11 +0000
>
>
> I tried doing a Sample() instead of identity function, but that got fused
> into the reduce as well and didn't work.
>
> First thing I tried was sticking a pipeline.run() in between there and I
> was surprised but it didn't work either, same error. I'll rerun that config
> now and try to get the dot files for the plan.
>
> Not sure if this is affecting it but in the same crunch application I have
> a completely independent pipeline the runs before this one executes. I'll
> turn that off as well and see if it's causing the issue.
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 17:43:52 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Drat, I was hoping it was something simple. You could manually fix it by
> injecting a pipeline.run() call between the secondarySort and the
> groupByKey(), but of course, we'd like to handle this situation correctly
> by default.
>
> J
>
> On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> I did a parallelDo with the IdentityFn of the output of the secondarySort
> and the IdentityFn was just fused into the reduce phase of the
> secondarySort and I got the same error message.
>
> I think you want me to somehow force a map phase in between the two
> reduces?
>
> -Danny
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Tue, 25 Nov 2014 17:23:29 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Oh, dumb question-- if you put like a dummy function between the
> secondarySort and the groupByKey, like an IdentityFn or something, do
> things work again? That would help w/diagnosing the problem.
>
> On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
>
> So if you're getting it quickly, it might be b/c the job isn't recognizing
> the dependency between the two separate phases of the job for some reason
> (e.g., it's not realizing that one job has to be run before the other one.)
> That's an odd situation, but we have had bugs like that in the past; let me
> see if I can re-create the situation in an integration test. Which version
> of Crunch?
>
> J
>
> On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> No that's definitely not it. I get this issue if I write to a single
> output as well.
>
> If I remove the groupByKey().combineValues() line and just write out the
> output from the SecondarySort it works. Seems to only complain about the
> temp path not existing when I have multiple reduce phases in the pipeline.
> Also the error seems to happen immediately during the setup or planning
> phase, I assume this because the yarn jobs get created but they don't do
> anything, and instead of FAILED the error message is "Application killed by
> user."
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:30:58 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
> <https://issues.apache.org/jira/browse/CRUNCH-481-->
>
> On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hello Again Josh,
>
> The link to the Jira issue you sent out seems to be cut off, could you
> please resend it?
>
> I deleted the line where I write the collection to a text file, and
> retried it but it didn't work either. Also tried writing the collection out
> as Avro instead of Parquet, but got the same error.
>
> Here's the rest of the stracktrace:
>
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-2008950085/p1
>         at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>         at
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>         at
> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
>         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
>         at java.lang.Thread.run(Thread.java:744)
>
> Thanks Josh!
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:10:33 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
>
> Hey Danny,
>
> I'm wondering if this is caused by
> https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use
> different output committers for text files vs. parquet files, so at least
> one of the outputs won't be written properly-- does that make sense?
>
> Josh
>
> On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hi Crunchers,
>
> I've attached a pdf of what my plan looks like. I've run into this problem
> before where I have multiple reduce steps chained together in a single
> pipeline and always get the same error.
>
> In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
> Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
>
> That's the temp directory the crunch planner set up for the first reduce
> phase.
>
> Can I run multiple chained reduces within the same pipeline? Do I have to
> manually write out the output from the first reduce?
>
> Here's what the code looks like:
>
>       // Simple mapper
>       PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
>       // Secondary sort happens here
>       PTable<Danny, Long> second = Danny.extractDannys(first);
>       // Regular group by
>       PTable<Danny, Long> third =
> second.groupByKey().combineValues(Aggregators.SUM_LONGS());
>       // simple function that populates some fields in the Danny object
> with the aggregate results
>       PCollection<Pair<Danny, String>> done = Danny.finalize(third);
>       Pair<PCollection<Danny>, PCollection<String>> splits =
> Channels.split(done);
>       splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);
>       Target pq_danny = new AvroParquetFileTarget(pqPath));
>       splits.first().write(pq_danny, WriteMode.OVERWRITE)
>
> Thanks!
>
> -Danny
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

RE: Multiple Reduces in a Single Crunch Job

Posted by Danny Morgan <un...@hotmail.com>.
Sorry for the delayed response, yes explicitly creating a config object and setting some avro compression options and then passing the conf object to the MRPipeline constructor.

Haven't switched over to the Compress helper class that was added 0.11 yet.

-Danny

From: josh.wills@gmail.com
Date: Fri, 5 Dec 2014 00:59:00 +0000
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

I think it's a bug, or at least, a configuration issue. When you construct the MRPipeline, are you explicitly passing it a Configuration object?
On Thu, Dec 4, 2014 at 6:24 PM Danny Morgan <un...@hotmail.com> wrote:



Hi Josh,

Sorry I mixed up pipelines there is no s3 write in this case.

So you are correct the intermediate Avro file that's the output of the SecondarySort is labeled "/tmp" I don't manually create this local file, the crunch planner seems to insert that materialization phase in. If you refer back to my original email the error I get is:

"org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"

So the dot plan has the file labeled as "/tmp/crunch-*" however when the job runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a labeling issue with the plan output or might this be the bug?

-Danny

From: jwills@cloudera.com
Date: Thu, 4 Dec 2014 15:44:37 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Hey Danny,
Inlined.
On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Josh,

Thanks for taking the time to look into this.

I do get a PCollection<Object, String> and split it. I write the Avro objects as parquet to HDFS and I get the String collection and write it out to s3n://. I have noticed that the s3n:// targets copy their files to the local filesystem's /tmp and then copy the file up to s3. This process happens serially and is super slow, I'm not sure if it's a crunch issue or a general HDFS one.

I'm not following; I'm referring to the second_phase.pdf plan file, which has a bunch of Avro inputs that are being merged together and secondary sorted (some sort of sessionization, I assume) followed by a GBK/combineValues and then the write to Parquet. Where does the PCollection<Object, String> fit in? And is the S3 write part of the same Pipeline instance? I'm wondering if the multiple FileSystems are confusing the planner w/respect to where it should create the temp file. 
Let me know if I can help debug further, as I mentioned calling pipeline.cache() and pipeline.run() between the reduces did solve my problem although I guess it is a hack.

BTW Spotify's crunch-lib looks great, any integration plans?

I also really like it and would like to incorporate basically all of it; will start a thread on dev@ about it and see if David is up for it. 
-Danny

From: jwills@cloudera.com
Date: Thu, 4 Dec 2014 14:21:55 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Danny,
Spent a couple of hours today banging on this by hacking on some integration tests but couldn't replicate it. However, I just took a closer look at the plan you posted, and I noticed that all of the files you are writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that Crunch is creating; is it possible that Crunch is creating the temp file locally on your client machine for some reason? I can't think of why that would happen off the top of my head, but if that is the problem, I'll at least be able to figure out where to look.
Josh

On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com> wrote:



No problem, Happy Thanksgiving!
Gobble Gobble...

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 18:23:14 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Very useful-- thank you. Will dig into it and report back, although I'm heading out for the holiday so it likely won't be until early next week.
J
On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com> wrote:






Having a single pipeline in the application didn't fix it. Sticking a pipeline.run() in the middle also didn't matter either, the plan appears such that the planner is completely ignoring the second the run() I added.
However what DOES WORK is if I do:
collection = secondarySort()pipeline.cache(collection)pipeline.run()newcollection = collection.groupByKey()
If I try adding the cache() without calling run() in between it doesn't work. Hope that's enough info for you to fix the possible planner bug.
Thanks for the help Josh!
From: unluckyboy@hotmail.com
To: user@crunch.apache.org
Subject: RE: Multiple Reduces in a Single Crunch Job
Date: Wed, 26 Nov 2014 01:58:11 +0000




I tried doing a Sample() instead of identity function, but that got fused into the reduce as well and didn't work.
First thing I tried was sticking a pipeline.run() in between there and I was surprised but it didn't work either, same error. I'll rerun that config now and try to get the dot files for the plan.
Not sure if this is affecting it but in the same crunch application I have a completely independent pipeline the runs before this one executes. I'll turn that off as well and see if it's causing the issue.

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 17:43:52 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Drat, I was hoping it was something simple. You could manually fix it by injecting a pipeline.run() call between the secondarySort and the groupByKey(), but of course, we'd like to handle this situation correctly by default.
J
On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com> wrote:



I did a parallelDo with the IdentityFn of the output of the secondarySort and the IdentityFn was just fused into the reduce phase of the secondarySort and I got the same error message.
I think you want me to somehow force a map phase in between the two reduces?
-Danny

From: josh.wills@gmail.com
Date: Tue, 25 Nov 2014 17:23:29 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Oh, dumb question-- if you put like a dummy function between the secondarySort and the groupByKey, like an IdentityFn or something, do things work again? That would help w/diagnosing the problem.
On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
So if you're getting it quickly, it might be b/c the job isn't recognizing the dependency between the two separate phases of the job for some reason (e.g., it's not realizing that one job has to be run before the other one.) That's an odd situation, but we have had bugs like that in the past; let me see if I can re-create the situation in an integration test. Which version of Crunch?
J
On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com> wrote:



No that's definitely not it. I get this issue if I write to a single output as well.
If I remove the groupByKey().combineValues() line and just write out the output from the SecondarySort it works. Seems to only complain about the temp path not existing when I have multiple reduce phases in the pipeline. Also the error seems to happen immediately during the setup or planning phase, I assume this because the yarn jobs get created but they don't do anything, and instead of FAILED the error message is "Application killed by user."
-Danny

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:30:58 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com> wrote:



Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please resend it?
I deleted the line where I write the collection to a text file, and retried it but it didn't work either. Also tried writing the collection out as Avro instead of Parquet, but got the same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-2008950085/p1        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)        at org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)        at org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)        at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)        at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)        at java.lang.Thread.run(Thread.java:744)
Thanks Josh!
From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Hey Danny,
I'm wondering if this is caused by https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different output committers for text files vs. parquet files, so at least one of the outputs won't be written properly-- does that make sense?
Josh
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem before where I have multiple reduce steps chained together in a single pipeline and always get the same error.
In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to manually write out the output from the first reduce?
Here's what the code looks like:
      // Simple mapper      PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);      // Secondary sort happens here      PTable<Danny, Long> second = Danny.extractDannys(first);      // Regular group by      PTable<Danny, Long> third = second.groupByKey().combineValues(Aggregators.SUM_LONGS());      // simple function that populates some fields in the Danny object with the aggregate results      PCollection<Pair<Danny, String>> done = Danny.finalize(third);      Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);      splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);      Target pq_danny = new AvroParquetFileTarget(pqPath));      splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  

Re: Multiple Reduces in a Single Crunch Job

Posted by Josh Wills <jo...@gmail.com>.
I think it's a bug, or at least, a configuration issue. When you construct
the MRPipeline, are you explicitly passing it a Configuration object?
On Thu, Dec 4, 2014 at 6:24 PM Danny Morgan <un...@hotmail.com> wrote:

> Hi Josh,
>
> Sorry I mixed up pipelines there is no s3 write in this case.
>
> So you are correct the intermediate Avro file that's the output of the
> SecondarySort is labeled "/tmp" I don't manually create this local file,
> the crunch planner seems to insert that materialization phase in. If you
> refer back to my original email the error I get is:
>
> "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-1279941375/p1
> "
>
> So the dot plan has the file labeled as "/tmp/crunch-*" however when the
> job runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a
> labeling issue with the plan output or might this be the bug?
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 4 Dec 2014 15:44:37 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Hey Danny,
>
> Inlined.
>
> On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hi Josh,
>
> Thanks for taking the time to look into this.
>
> I do get a PCollection<Object, String> and split it. I write the Avro
> objects as parquet to HDFS and I get the String collection and write it out
> to s3n://. I have noticed that the s3n:// targets copy their files to the
> local filesystem's /tmp and then copy the file up to s3. This process
> happens serially and is super slow, I'm not sure if it's a crunch issue or
> a general HDFS one.
>
>
> I'm not following; I'm referring to the second_phase.pdf plan file, which
> has a bunch of Avro inputs that are being merged together and secondary
> sorted (some sort of sessionization, I assume) followed by a
> GBK/combineValues and then the write to Parquet. Where does the
> PCollection<Object, String> fit in? And is the S3 write part of the same
> Pipeline instance? I'm wondering if the multiple FileSystems are confusing
> the planner w/respect to where it should create the temp file.
>
>
>
> Let me know if I can help debug further, as I mentioned calling
> pipeline.cache() and pipeline.run() between the reduces did solve my
> problem although I guess it is a hack.
>
> BTW Spotify's crunch-lib looks great, any integration plans?
>
>
> I also really like it and would like to incorporate basically all of it;
> will start a thread on dev@ about it and see if David is up for it.
>
>
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 4 Dec 2014 14:21:55 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Danny,
>
> Spent a couple of hours today banging on this by hacking on some
> integration tests but couldn't replicate it. However, I just took a closer
> look at the plan you posted, and I noticed that all of the files you are
> writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that
> Crunch is creating; is it possible that Crunch is creating the temp file
> locally on your client machine for some reason? I can't think of why that
> would happen off the top of my head, but if that is the problem, I'll at
> least be able to figure out where to look.
>
> Josh
>
>
> On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> No problem, Happy Thanksgiving!
>
> Gobble Gobble...
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 18:23:14 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Very useful-- thank you. Will dig into it and report back, although I'm
> heading out for the holiday so it likely won't be until early next week.
>
> J
>
> On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
>  Having a single pipeline in the application didn't fix it. Sticking a
> pipeline.run() in the middle also didn't matter either, the plan appears
> such that the planner is completely ignoring the second the run() I added.
>
> However what DOES WORK is if I do:
>
> collection = secondarySort()
> pipeline.cache(collection)
> pipeline.run()
> newcollection = collection.groupByKey()
>
> If I try adding the cache() without calling run() in between it doesn't
> work. Hope that's enough info for you to fix the possible planner bug.
>
> Thanks for the help Josh!
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Reduces in a Single Crunch Job
> Date: Wed, 26 Nov 2014 01:58:11 +0000
>
>
> I tried doing a Sample() instead of identity function, but that got fused
> into the reduce as well and didn't work.
>
> First thing I tried was sticking a pipeline.run() in between there and I
> was surprised but it didn't work either, same error. I'll rerun that config
> now and try to get the dot files for the plan.
>
> Not sure if this is affecting it but in the same crunch application I have
> a completely independent pipeline the runs before this one executes. I'll
> turn that off as well and see if it's causing the issue.
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 17:43:52 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Drat, I was hoping it was something simple. You could manually fix it by
> injecting a pipeline.run() call between the secondarySort and the
> groupByKey(), but of course, we'd like to handle this situation correctly
> by default.
>
> J
>
> On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> I did a parallelDo with the IdentityFn of the output of the secondarySort
> and the IdentityFn was just fused into the reduce phase of the
> secondarySort and I got the same error message.
>
> I think you want me to somehow force a map phase in between the two
> reduces?
>
> -Danny
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Tue, 25 Nov 2014 17:23:29 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Oh, dumb question-- if you put like a dummy function between the
> secondarySort and the groupByKey, like an IdentityFn or something, do
> things work again? That would help w/diagnosing the problem.
>
> On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
>
> So if you're getting it quickly, it might be b/c the job isn't recognizing
> the dependency between the two separate phases of the job for some reason
> (e.g., it's not realizing that one job has to be run before the other one.)
> That's an odd situation, but we have had bugs like that in the past; let me
> see if I can re-create the situation in an integration test. Which version
> of Crunch?
>
> J
>
> On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> No that's definitely not it. I get this issue if I write to a single
> output as well.
>
> If I remove the groupByKey().combineValues() line and just write out the
> output from the SecondarySort it works. Seems to only complain about the
> temp path not existing when I have multiple reduce phases in the pipeline.
> Also the error seems to happen immediately during the setup or planning
> phase, I assume this because the yarn jobs get created but they don't do
> anything, and instead of FAILED the error message is "Application killed by
> user."
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:30:58 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
> <https://issues.apache.org/jira/browse/CRUNCH-481-->
>
> On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hello Again Josh,
>
> The link to the Jira issue you sent out seems to be cut off, could you
> please resend it?
>
> I deleted the line where I write the collection to a text file, and
> retried it but it didn't work either. Also tried writing the collection out
> as Avro instead of Parquet, but got the same error.
>
> Here's the rest of the stracktrace:
>
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-2008950085/p1
>         at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>         at
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>         at
> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
>         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
>         at java.lang.Thread.run(Thread.java:744)
>
> Thanks Josh!
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:10:33 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
>
> Hey Danny,
>
> I'm wondering if this is caused by
> https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use
> different output committers for text files vs. parquet files, so at least
> one of the outputs won't be written properly-- does that make sense?
>
> Josh
>
> On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hi Crunchers,
>
> I've attached a pdf of what my plan looks like. I've run into this problem
> before where I have multiple reduce steps chained together in a single
> pipeline and always get the same error.
>
> In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
> Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
>
> That's the temp directory the crunch planner set up for the first reduce
> phase.
>
> Can I run multiple chained reduces within the same pipeline? Do I have to
> manually write out the output from the first reduce?
>
> Here's what the code looks like:
>
>       // Simple mapper
>       PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
>       // Secondary sort happens here
>       PTable<Danny, Long> second = Danny.extractDannys(first);
>       // Regular group by
>       PTable<Danny, Long> third =
> second.groupByKey().combineValues(Aggregators.SUM_LONGS());
>       // simple function that populates some fields in the Danny object
> with the aggregate results
>       PCollection<Pair<Danny, String>> done = Danny.finalize(third);
>       Pair<PCollection<Danny>, PCollection<String>> splits =
> Channels.split(done);
>       splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);
>       Target pq_danny = new AvroParquetFileTarget(pqPath));
>       splits.first().write(pq_danny, WriteMode.OVERWRITE)
>
> Thanks!
>
> -Danny
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>

RE: Multiple Reduces in a Single Crunch Job

Posted by Danny Morgan <un...@hotmail.com>.
Hi Josh,

Sorry I mixed up pipelines there is no s3 write in this case.

So you are correct the intermediate Avro file that's the output of the SecondarySort is labeled "/tmp" I don't manually create this local file, the crunch planner seems to insert that materialization phase in. If you refer back to my original email the error I get is:

"org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"

So the dot plan has the file labeled as "/tmp/crunch-*" however when the job runs it's expecting to find an "hdfs:///tmp/crunch-*". Is this a labeling issue with the plan output or might this be the bug?

-Danny

From: jwills@cloudera.com
Date: Thu, 4 Dec 2014 15:44:37 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Hey Danny,
Inlined.
On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Josh,

Thanks for taking the time to look into this.

I do get a PCollection<Object, String> and split it. I write the Avro objects as parquet to HDFS and I get the String collection and write it out to s3n://. I have noticed that the s3n:// targets copy their files to the local filesystem's /tmp and then copy the file up to s3. This process happens serially and is super slow, I'm not sure if it's a crunch issue or a general HDFS one.

I'm not following; I'm referring to the second_phase.pdf plan file, which has a bunch of Avro inputs that are being merged together and secondary sorted (some sort of sessionization, I assume) followed by a GBK/combineValues and then the write to Parquet. Where does the PCollection<Object, String> fit in? And is the S3 write part of the same Pipeline instance? I'm wondering if the multiple FileSystems are confusing the planner w/respect to where it should create the temp file. 
Let me know if I can help debug further, as I mentioned calling pipeline.cache() and pipeline.run() between the reduces did solve my problem although I guess it is a hack.

BTW Spotify's crunch-lib looks great, any integration plans?

I also really like it and would like to incorporate basically all of it; will start a thread on dev@ about it and see if David is up for it. 
-Danny

From: jwills@cloudera.com
Date: Thu, 4 Dec 2014 14:21:55 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Danny,
Spent a couple of hours today banging on this by hacking on some integration tests but couldn't replicate it. However, I just took a closer look at the plan you posted, and I noticed that all of the files you are writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that Crunch is creating; is it possible that Crunch is creating the temp file locally on your client machine for some reason? I can't think of why that would happen off the top of my head, but if that is the problem, I'll at least be able to figure out where to look.
Josh

On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com> wrote:



No problem, Happy Thanksgiving!
Gobble Gobble...

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 18:23:14 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Very useful-- thank you. Will dig into it and report back, although I'm heading out for the holiday so it likely won't be until early next week.
J
On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com> wrote:






Having a single pipeline in the application didn't fix it. Sticking a pipeline.run() in the middle also didn't matter either, the plan appears such that the planner is completely ignoring the second the run() I added.
However what DOES WORK is if I do:
collection = secondarySort()pipeline.cache(collection)pipeline.run()newcollection = collection.groupByKey()
If I try adding the cache() without calling run() in between it doesn't work. Hope that's enough info for you to fix the possible planner bug.
Thanks for the help Josh!
From: unluckyboy@hotmail.com
To: user@crunch.apache.org
Subject: RE: Multiple Reduces in a Single Crunch Job
Date: Wed, 26 Nov 2014 01:58:11 +0000




I tried doing a Sample() instead of identity function, but that got fused into the reduce as well and didn't work.
First thing I tried was sticking a pipeline.run() in between there and I was surprised but it didn't work either, same error. I'll rerun that config now and try to get the dot files for the plan.
Not sure if this is affecting it but in the same crunch application I have a completely independent pipeline the runs before this one executes. I'll turn that off as well and see if it's causing the issue.

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 17:43:52 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Drat, I was hoping it was something simple. You could manually fix it by injecting a pipeline.run() call between the secondarySort and the groupByKey(), but of course, we'd like to handle this situation correctly by default.
J
On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com> wrote:



I did a parallelDo with the IdentityFn of the output of the secondarySort and the IdentityFn was just fused into the reduce phase of the secondarySort and I got the same error message.
I think you want me to somehow force a map phase in between the two reduces?
-Danny

From: josh.wills@gmail.com
Date: Tue, 25 Nov 2014 17:23:29 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Oh, dumb question-- if you put like a dummy function between the secondarySort and the groupByKey, like an IdentityFn or something, do things work again? That would help w/diagnosing the problem.
On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
So if you're getting it quickly, it might be b/c the job isn't recognizing the dependency between the two separate phases of the job for some reason (e.g., it's not realizing that one job has to be run before the other one.) That's an odd situation, but we have had bugs like that in the past; let me see if I can re-create the situation in an integration test. Which version of Crunch?
J
On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com> wrote:



No that's definitely not it. I get this issue if I write to a single output as well.
If I remove the groupByKey().combineValues() line and just write out the output from the SecondarySort it works. Seems to only complain about the temp path not existing when I have multiple reduce phases in the pipeline. Also the error seems to happen immediately during the setup or planning phase, I assume this because the yarn jobs get created but they don't do anything, and instead of FAILED the error message is "Application killed by user."
-Danny

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:30:58 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com> wrote:



Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please resend it?
I deleted the line where I write the collection to a text file, and retried it but it didn't work either. Also tried writing the collection out as Avro instead of Parquet, but got the same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-2008950085/p1        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)        at org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)        at org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)        at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)        at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)        at java.lang.Thread.run(Thread.java:744)
Thanks Josh!
From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Hey Danny,
I'm wondering if this is caused by https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different output committers for text files vs. parquet files, so at least one of the outputs won't be written properly-- does that make sense?
Josh
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem before where I have multiple reduce steps chained together in a single pipeline and always get the same error.
In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to manually write out the output from the first reduce?
Here's what the code looks like:
      // Simple mapper      PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);      // Secondary sort happens here      PTable<Danny, Long> second = Danny.extractDannys(first);      // Regular group by      PTable<Danny, Long> third = second.groupByKey().combineValues(Aggregators.SUM_LONGS());      // simple function that populates some fields in the Danny object with the aggregate results      PCollection<Pair<Danny, String>> done = Danny.finalize(third);      Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);      splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);      Target pq_danny = new AvroParquetFileTarget(pqPath));      splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  

Re: Multiple Reduces in a Single Crunch Job

Posted by Josh Wills <jw...@cloudera.com>.
Hey Danny,

Inlined.

On Thu, Dec 4, 2014 at 3:20 PM, Danny Morgan <un...@hotmail.com> wrote:

> Hi Josh,
>
> Thanks for taking the time to look into this.
>
> I do get a PCollection<Object, String> and split it. I write the Avro
> objects as parquet to HDFS and I get the String collection and write it out
> to s3n://. I have noticed that the s3n:// targets copy their files to the
> local filesystem's /tmp and then copy the file up to s3. This process
> happens serially and is super slow, I'm not sure if it's a crunch issue or
> a general HDFS one.
>

I'm not following; I'm referring to the second_phase.pdf plan file, which
has a bunch of Avro inputs that are being merged together and secondary
sorted (some sort of sessionization, I assume) followed by a
GBK/combineValues and then the write to Parquet. Where does the
PCollection<Object, String> fit in? And is the S3 write part of the same
Pipeline instance? I'm wondering if the multiple FileSystems are confusing
the planner w/respect to where it should create the temp file.


>
> Let me know if I can help debug further, as I mentioned calling
> pipeline.cache() and pipeline.run() between the reduces did solve my
> problem although I guess it is a hack.
>
> BTW Spotify's crunch-lib looks great, any integration plans?
>

I also really like it and would like to incorporate basically all of it;
will start a thread on dev@ about it and see if David is up for it.


>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Thu, 4 Dec 2014 14:21:55 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Danny,
>
> Spent a couple of hours today banging on this by hacking on some
> integration tests but couldn't replicate it. However, I just took a closer
> look at the plan you posted, and I noticed that all of the files you are
> writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that
> Crunch is creating; is it possible that Crunch is creating the temp file
> locally on your client machine for some reason? I can't think of why that
> would happen off the top of my head, but if that is the problem, I'll at
> least be able to figure out where to look.
>
> Josh
>
>
> On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> No problem, Happy Thanksgiving!
>
> Gobble Gobble...
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 18:23:14 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Very useful-- thank you. Will dig into it and report back, although I'm
> heading out for the holiday so it likely won't be until early next week.
>
> J
>
> On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
>  Having a single pipeline in the application didn't fix it. Sticking a
> pipeline.run() in the middle also didn't matter either, the plan appears
> such that the planner is completely ignoring the second the run() I added.
>
> However what DOES WORK is if I do:
>
> collection = secondarySort()
> pipeline.cache(collection)
> pipeline.run()
> newcollection = collection.groupByKey()
>
> If I try adding the cache() without calling run() in between it doesn't
> work. Hope that's enough info for you to fix the possible planner bug.
>
> Thanks for the help Josh!
>
> ------------------------------
> From: unluckyboy@hotmail.com
> To: user@crunch.apache.org
> Subject: RE: Multiple Reduces in a Single Crunch Job
> Date: Wed, 26 Nov 2014 01:58:11 +0000
>
>
> I tried doing a Sample() instead of identity function, but that got fused
> into the reduce as well and didn't work.
>
> First thing I tried was sticking a pipeline.run() in between there and I
> was surprised but it didn't work either, same error. I'll rerun that config
> now and try to get the dot files for the plan.
>
> Not sure if this is affecting it but in the same crunch application I have
> a completely independent pipeline the runs before this one executes. I'll
> turn that off as well and see if it's causing the issue.
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 17:43:52 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Drat, I was hoping it was something simple. You could manually fix it by
> injecting a pipeline.run() call between the secondarySort and the
> groupByKey(), but of course, we'd like to handle this situation correctly
> by default.
>
> J
>
> On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> I did a parallelDo with the IdentityFn of the output of the secondarySort
> and the IdentityFn was just fused into the reduce phase of the
> secondarySort and I got the same error message.
>
> I think you want me to somehow force a map phase in between the two
> reduces?
>
> -Danny
>
> ------------------------------
> From: josh.wills@gmail.com
> Date: Tue, 25 Nov 2014 17:23:29 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Oh, dumb question-- if you put like a dummy function between the
> secondarySort and the groupByKey, like an IdentityFn or something, do
> things work again? That would help w/diagnosing the problem.
>
> On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
>
> So if you're getting it quickly, it might be b/c the job isn't recognizing
> the dependency between the two separate phases of the job for some reason
> (e.g., it's not realizing that one job has to be run before the other one.)
> That's an odd situation, but we have had bugs like that in the past; let me
> see if I can re-create the situation in an integration test. Which version
> of Crunch?
>
> J
>
> On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> No that's definitely not it. I get this issue if I write to a single
> output as well.
>
> If I remove the groupByKey().combineValues() line and just write out the
> output from the SecondarySort it works. Seems to only complain about the
> temp path not existing when I have multiple reduce phases in the pipeline.
> Also the error seems to happen immediately during the setup or planning
> phase, I assume this because the yarn jobs get created but they don't do
> anything, and instead of FAILED the error message is "Application killed by
> user."
>
> -Danny
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:30:58 -0800
>
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
> Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
> <https://issues.apache.org/jira/browse/CRUNCH-481-->
>
> On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hello Again Josh,
>
> The link to the Jira issue you sent out seems to be cut off, could you
> please resend it?
>
> I deleted the line where I write the collection to a text file, and
> retried it but it didn't work either. Also tried writing the collection out
> as Avro instead of Parquet, but got the same error.
>
> Here's the rest of the stracktrace:
>
> org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path
> does not exist: hdfs:///tmp/crunch-2008950085/p1
>         at
> org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)
>         at
> org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)
>         at
> org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)
>         at
> org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)
>         at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)
>         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)
>         at
> org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)
>         at
> org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)
>         at java.lang.Thread.run(Thread.java:744)
>
> Thanks Josh!
>
> ------------------------------
> From: jwills@cloudera.com
> Date: Tue, 25 Nov 2014 16:10:33 -0800
> Subject: Re: Multiple Reduces in a Single Crunch Job
> To: user@crunch.apache.org
>
>
> Hey Danny,
>
> I'm wondering if this is caused by
> https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use
> different output committers for text files vs. parquet files, so at least
> one of the outputs won't be written properly-- does that make sense?
>
> Josh
>
> On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com>
> wrote:
>
> Hi Crunchers,
>
> I've attached a pdf of what my plan looks like. I've run into this problem
> before where I have multiple reduce steps chained together in a single
> pipeline and always get the same error.
>
> In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException:
> Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
>
> That's the temp directory the crunch planner set up for the first reduce
> phase.
>
> Can I run multiple chained reduces within the same pipeline? Do I have to
> manually write out the output from the first reduce?
>
> Here's what the code looks like:
>
>       // Simple mapper
>       PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);
>       // Secondary sort happens here
>       PTable<Danny, Long> second = Danny.extractDannys(first);
>       // Regular group by
>       PTable<Danny, Long> third =
> second.groupByKey().combineValues(Aggregators.SUM_LONGS());
>       // simple function that populates some fields in the Danny object
> with the aggregate results
>       PCollection<Pair<Danny, String>> done = Danny.finalize(third);
>       Pair<PCollection<Danny>, PCollection<String>> splits =
> Channels.split(done);
>       splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);
>       Target pq_danny = new AvroParquetFileTarget(pqPath));
>       splits.first().write(pq_danny, WriteMode.OVERWRITE)
>
> Thanks!
>
> -Danny
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>
>
>
>
> --
> Director of Data Science
> Cloudera <http://www.cloudera.com>
> Twitter: @josh_wills <http://twitter.com/josh_wills>
>



-- 
Director of Data Science
Cloudera <http://www.cloudera.com>
Twitter: @josh_wills <http://twitter.com/josh_wills>

RE: Multiple Reduces in a Single Crunch Job

Posted by Danny Morgan <un...@hotmail.com>.
Hi Josh,

Thanks for taking the time to look into this.

I do get a PCollection<Object, String> and split it. I write the Avro objects as parquet to HDFS and I get the String collection and write it out to s3n://. I have noticed that the s3n:// targets copy their files to the local filesystem's /tmp and then copy the file up to s3. This process happens serially and is super slow, I'm not sure if it's a crunch issue or a general HDFS one.

Let me know if I can help debug further, as I mentioned calling pipeline.cache() and pipeline.run() between the reduces did solve my problem although I guess it is a hack.

BTW Spotify's crunch-lib looks great, any integration plans?

-Danny

From: jwills@cloudera.com
Date: Thu, 4 Dec 2014 14:21:55 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Danny,
Spent a couple of hours today banging on this by hacking on some integration tests but couldn't replicate it. However, I just took a closer look at the plan you posted, and I noticed that all of the files you are writing out are prefixed w/ "hdfs:/" except for the /tmp/crunch-* file that Crunch is creating; is it possible that Crunch is creating the temp file locally on your client machine for some reason? I can't think of why that would happen off the top of my head, but if that is the problem, I'll at least be able to figure out where to look.
Josh

On Tue, Nov 25, 2014 at 6:30 PM, Danny Morgan <un...@hotmail.com> wrote:



No problem, Happy Thanksgiving!
Gobble Gobble...

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 18:23:14 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Very useful-- thank you. Will dig into it and report back, although I'm heading out for the holiday so it likely won't be until early next week.
J
On Tue, Nov 25, 2014 at 6:16 PM, Danny Morgan <un...@hotmail.com> wrote:






Having a single pipeline in the application didn't fix it. Sticking a pipeline.run() in the middle also didn't matter either, the plan appears such that the planner is completely ignoring the second the run() I added.
However what DOES WORK is if I do:
collection = secondarySort()pipeline.cache(collection)pipeline.run()newcollection = collection.groupByKey()
If I try adding the cache() without calling run() in between it doesn't work. Hope that's enough info for you to fix the possible planner bug.
Thanks for the help Josh!
From: unluckyboy@hotmail.com
To: user@crunch.apache.org
Subject: RE: Multiple Reduces in a Single Crunch Job
Date: Wed, 26 Nov 2014 01:58:11 +0000




I tried doing a Sample() instead of identity function, but that got fused into the reduce as well and didn't work.
First thing I tried was sticking a pipeline.run() in between there and I was surprised but it didn't work either, same error. I'll rerun that config now and try to get the dot files for the plan.
Not sure if this is affecting it but in the same crunch application I have a completely independent pipeline the runs before this one executes. I'll turn that off as well and see if it's causing the issue.

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 17:43:52 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Drat, I was hoping it was something simple. You could manually fix it by injecting a pipeline.run() call between the secondarySort and the groupByKey(), but of course, we'd like to handle this situation correctly by default.
J
On Tue, Nov 25, 2014 at 5:39 PM, Danny Morgan <un...@hotmail.com> wrote:



I did a parallelDo with the IdentityFn of the output of the secondarySort and the IdentityFn was just fused into the reduce phase of the secondarySort and I got the same error message.
I think you want me to somehow force a map phase in between the two reduces?
-Danny

From: josh.wills@gmail.com
Date: Tue, 25 Nov 2014 17:23:29 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Oh, dumb question-- if you put like a dummy function between the secondarySort and the groupByKey, like an IdentityFn or something, do things work again? That would help w/diagnosing the problem.
On Tue, Nov 25, 2014 at 5:15 PM, Josh Wills <jw...@cloudera.com> wrote:
So if you're getting it quickly, it might be b/c the job isn't recognizing the dependency between the two separate phases of the job for some reason (e.g., it's not realizing that one job has to be run before the other one.) That's an odd situation, but we have had bugs like that in the past; let me see if I can re-create the situation in an integration test. Which version of Crunch?
J
On Tue, Nov 25, 2014 at 4:40 PM, Danny Morgan <un...@hotmail.com> wrote:



No that's definitely not it. I get this issue if I write to a single output as well.
If I remove the groupByKey().combineValues() line and just write out the output from the SecondarySort it works. Seems to only complain about the temp path not existing when I have multiple reduce phases in the pipeline. Also the error seems to happen immediately during the setup or planning phase, I assume this because the yarn jobs get created but they don't do anything, and instead of FAILED the error message is "Application killed by user."
-Danny

From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:30:58 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Ack, sorry-- it's this: https://issues.apache.org/jira/browse/CRUNCH-481
On Tue, Nov 25, 2014 at 4:24 PM, Danny Morgan <un...@hotmail.com> wrote:



Hello Again Josh,
The link to the Jira issue you sent out seems to be cut off, could you please resend it?
I deleted the line where I write the collection to a text file, and retried it but it didn't work either. Also tried writing the collection out as Avro instead of Parquet, but got the same error.
Here's the rest of the stracktrace:
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-2008950085/p1        at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:285)        at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:217)        at org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:65)        at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:491)        at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:508)        at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:392)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268)        at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1265)        at java.security.AccessController.doPrivileged(Native Method)        at javax.security.auth.Subject.doAs(Subject.java:415)        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1528)        at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.submit(CrunchControlledJob.java:340)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.startReadyJobs(CrunchJobControl.java:277)        at org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl.pollJobStatusAndStartNewOnes(CrunchJobControl.java:316)        at org.apache.crunch.impl.mr.exec.MRExecutor.monitorLoop(MRExecutor.java:113)        at org.apache.crunch.impl.mr.exec.MRExecutor.access$000(MRExecutor.java:55)        at org.apache.crunch.impl.mr.exec.MRExecutor$1.run(MRExecutor.java:84)        at java.lang.Thread.run(Thread.java:744)
Thanks Josh!
From: jwills@cloudera.com
Date: Tue, 25 Nov 2014 16:10:33 -0800
Subject: Re: Multiple Reduces in a Single Crunch Job
To: user@crunch.apache.org

Hey Danny,
I'm wondering if this is caused by https://issues.apache.org/jira/browse/CRUNCH-481-- I think we use different output committers for text files vs. parquet files, so at least one of the outputs won't be written properly-- does that make sense?
Josh
On Tue, Nov 25, 2014 at 4:07 PM, Danny Morgan <un...@hotmail.com> wrote:



Hi Crunchers,
I've attached a pdf of what my plan looks like. I've run into this problem before where I have multiple reduce steps chained together in a single pipeline and always get the same error.
In the case of the attached pdf the error is "org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs:///tmp/crunch-1279941375/p1"
That's the temp directory the crunch planner set up for the first reduce phase.
Can I run multiple chained reduces within the same pipeline? Do I have to manually write out the output from the first reduce?
Here's what the code looks like:
      // Simple mapper      PTable<String, Pair<Long, Log>> first = Danny.filterForDanny(logs);      // Secondary sort happens here      PTable<Danny, Long> second = Danny.extractDannys(first);      // Regular group by      PTable<Danny, Long> third = second.groupByKey().combineValues(Aggregators.SUM_LONGS());      // simple function that populates some fields in the Danny object with the aggregate results      PCollection<Pair<Danny, String>> done = Danny.finalize(third);      Pair<PCollection<Danny>, PCollection<String>> splits = Channels.split(done);      splits.second().write(To.textFile(mypath, WriteMode.OVERWRITE);      Target pq_danny = new AvroParquetFileTarget(pqPath));      splits.first().write(pq_danny, WriteMode.OVERWRITE)
Thanks!
-Danny 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills


 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills
 		 	   		  


-- 
Director of Data ScienceClouderaTwitter: @josh_wills