You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pig.apache.org by Alex Bain <am...@gmail.com> on 2013/12/18 04:53:28 UTC

Review Request 16309: PIG-3629 Implement STREAM operator in Tez

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/
-----------------------------------------------------------

Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.


Bugs: PIG-3629
    https://issues.apache.org/jira/browse/PIG-3629


Repository: pig-git


Description
-------

Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629

In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU

Basic Changes:
-Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
-Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
-Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.

Resource Manager Changes:
-TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
-CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.


Diffs
-----

  src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
  src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 7a1736a 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 2584501 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java 96ccdde 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 

Diff: https://reviews.apache.org/r/16309/diff/


Testing
-------

Added a unit test to TestTezCompiler.java
Added an e2e test to tez.conf

ant test-tez passes
ant test-e2e-tez has three failures - I am investigating to see if they are releated, or perhaps just transient

Question: There is already a separate suite of STREAM tests in streaming.conf. Maybe I should remove my e2e test and we should add streaming.conf as a dependency to the test-e2e-tez target? I haven't tried to run streaming.conf yet.


Thanks,

Alex Bain


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Rohini Palaniswamy <ro...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review34604
-----------------------------------------------------------

Ship it!


Ship It!

- Rohini Palaniswamy


On Jan. 10, 2014, 10:35 p.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Jan. 10, 2014, 10:35 p.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> Race condition:
> -I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
> -I also made a small cleanup to PhysicalOperator.java.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 28a110a 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 3e6ec7b 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 7342dab 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java e28de47 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added a new unit test e2e test to tez.conf with session reuse enabled
> Ported three other e2e tests from streaming.conf to tez.conf to increase coverage
> 
> ant test-tez passes
> ant test-e2e-tez passes
> Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
> 
> 
> Thanks,
> 
> Alex Bain
> 
>


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Alex Bain <am...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/
-----------------------------------------------------------

(Updated Jan. 10, 2014, 2:35 p.m.)


Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.


Changes
-------

Reopening for fixes to Rohini's issues


Bugs: PIG-3629
    https://issues.apache.org/jira/browse/PIG-3629


Repository: pig-git


Description
-------

Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629

In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU

Basic Changes:
-Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
-Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
-Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.

Resource Manager Changes:
-TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
-CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.

Race condition:
-I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
-I also made a small cleanup to PhysicalOperator.java.


Diffs (updated)
-----

  src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 28a110a 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 3e6ec7b 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 7342dab 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java e28de47 

Diff: https://reviews.apache.org/r/16309/diff/


Testing
-------

Added a unit test to TestTezCompiler.java
Added a new unit test e2e test to tez.conf with session reuse enabled
Ported three other e2e tests from streaming.conf to tez.conf to increase coverage

ant test-tez passes
ant test-e2e-tez passes
Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass


Thanks,

Alex Bain


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Alex Bain <am...@gmail.com>.

> On Dec. 26, 2013, 10:08 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java, line 66
> > <https://reviews.apache.org/r/16309/diff/3/?file=402932#file402932line66>
> >
> >     This check should be removed. You are allowed to ship from any FileSystem implementation to the destination. The shipToHDFS method in Utils is a misnomer. It ships to the fs.defaultFS. We should rename the method to shipToDefaultFS

In my fix, I no longer call the method Utils.shipToHDFS, which writes to a temporary directory. Instead, I write to the staging directory for the job using the remote FS, in the same manner that Daniel's original code writes the job.jar to the staging directory. All the resources will now be in the staging directory for the job (on the remote FS). This should be much better.


> On Dec. 26, 2013, 10:08 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java, lines 84-85
> > <https://reviews.apache.org/r/16309/diff/3/?file=402932#file402932line84>
> >
> >     We should not be having this check. The destination filesystem could be anything. For eg: s3. Can we rename all variable names/methods in TezResourceManager, TezOperPlan that have hdfs in to say defaultFS, srcFS or remoteFS as appropriate.

I believe I have this corrected now. Although for SHIP, the documentation seems to assume that the "from" filesystem is always the local filesystem, i.e. SHIP('/foo/bar') means /foo/bar is local somehow (even if it's perhaps a mounted directory into some other kind of filesystem).


> On Dec. 26, 2013, 10:08 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java, lines 87-88
> > <https://reviews.apache.org/r/16309/diff/3/?file=402929#file402929line87>
> >
> >     Implementation of these do not look right to me. It does not honor FileSystem and fs.defaultFS and assume files are to be shipped to HDFS (local mode, s3 etc won't work). Needs to be fixed.
> >     
> >     Also whenever URL or URI conversion is required, use the Path class.
> >     i.e new Path(filepath).toUri().toURL() wherever a path is required. Path class takes care of normalizing the url for path and does some Windows specific handling.

So just generally, there are two annoying problems that I think are the source of the very valid issues you raised:

1. You can't use java.net.URL with schemes like s3://, hdfs://, etc. These result in exceptions.

2. You can't do new Path(filepath).toUri().toURL(), unless the string filepath already has a scheme at the front of it supported by java.net.URL. For example, if filepath is "file://foo/bar" this will work, but if filepath is "/foo/bar" you will get an exception about not having an absolute URI.

I think there are basically two ways to get around these problems:

1. Instead of java.net.URL's, identify resources with YARN url's (which can have any scheme) and do appropriate conversions everywhere. This is the approach I took for my original implementation of STREAM. I identified SHIP resources with YARN url's with a file:// scheme and CACHE resources already present in the remote FS with an hdfs:// scheme. Rohini, as you pointed out, maybe this is not such a good way.

2. Change the resources data structure from Map<URL, Path> to be Map<String, Path> where the string is the resource name. In the end, this is basically what we give to the AM anyways. The resource names become symlinks in container's working directory to the file pointed out by the Path (in the remote FS).

In this second approach, if you SHIP('/foo/bar'), I first copy bar into the staging directory for the job in the remote FS, and then map "bar" -> the path in the remote FS. If you CACHE('/foo/bar#fragment'), I just map "fragment" -> /foo/bar, which should already exist in the remote FS.

In this patch, I changed to the second approach.


> On Dec. 26, 2013, 10:08 a.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java, line 60
> > <https://reviews.apache.org/r/16309/diff/3/?file=402932#file402932line60>
> >
> >     The conversion to org.apache.hadoop.yarn.api.records.URL in multiple places for local files seems unneccessary. Does not seem to be required and complicates code. Only LocalResource.newInstance takes org.apache.hadoop.yarn.api.records.URL in getTezResources but we are still doing a ConverterUtils.getYarnUrlFromPath(fstat.getPath()) there. Can we just use java.net.URL everywhere?

I removed the YARN url's and now use java.net.URL only to identify files to ship from the local filesystem to the remote FS.


- Alex


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review30868
-----------------------------------------------------------


On Dec. 23, 2013, 5:34 p.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2013, 5:34 p.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> Race condition:
> -I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
> -I also made a small cleanup to PhysicalOperator.java.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 8487a0f 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java d57aded 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 0ee7256 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 191563d 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java df9fea6 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 862e637 
>   test/e2e/pig/tests/tez.conf 0e4ba4e 
>   test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld PRE-CREATION 
>   test/org/apache/pig/tez/TestTezCompiler.java 8d5e5f2 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added a new unit test e2e test to tez.conf with session reuse enabled
> Ported three other e2e tests from streaming.conf to tez.conf to increase coverage
> 
> ant test-tez passes
> ant test-e2e-tez passes
> Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
> 
> 
> Thanks,
> 
> Alex Bain
> 
>


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Rohini Palaniswamy <ro...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review30868
-----------------------------------------------------------


Alex,
   Sorry for the late review. The shipping of local resources code needs some fixing. Review comments below. I know the patch is already committed. If you can fix these, I will commit them as an addendum patch.


src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
<https://reviews.apache.org/r/16309/#comment59095>

    Implementation of these do not look right to me. It does not honor FileSystem and fs.defaultFS and assume files are to be shipped to HDFS (local mode, s3 etc won't work). Needs to be fixed.
    
    Also whenever URL or URI conversion is required, use the Path class.
    i.e new Path(filepath).toUri().toURL() wherever a path is required. Path class takes care of normalizing the url for path and does some Windows specific handling.



src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
<https://reviews.apache.org/r/16309/#comment59096>

    You can directly call addExtraResource instead of adding to a set and iterating over it.



src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
<https://reviews.apache.org/r/16309/#comment59097>

    Shouldn't be parsing for #. Can use URI.getPath() to get path without fragment



src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
<https://reviews.apache.org/r/16309/#comment59098>

    Can directly do addExtraResource instead of adding to a map and then iterating over the map.
    
    Also in this case the same file could be symlinked to different names. Need to handle them instead of ignoring based on key being the same.



src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
<https://reviews.apache.org/r/16309/#comment59090>

    The conversion to org.apache.hadoop.yarn.api.records.URL in multiple places for local files seems unneccessary. Does not seem to be required and complicates code. Only LocalResource.newInstance takes org.apache.hadoop.yarn.api.records.URL in getTezResources but we are still doing a ConverterUtils.getYarnUrlFromPath(fstat.getPath()) there. Can we just use java.net.URL everywhere?



src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
<https://reviews.apache.org/r/16309/#comment59091>

    This check should be removed. You are allowed to ship from any FileSystem implementation to the destination. The shipToHDFS method in Utils is a misnomer. It ships to the fs.defaultFS. We should rename the method to shipToDefaultFS



src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
<https://reviews.apache.org/r/16309/#comment59092>

    We should not be having this check. The destination filesystem could be anything. For eg: s3. Can we rename all variable names/methods in TezResourceManager, TezOperPlan that have hdfs in to say defaultFS, srcFS or remoteFS as appropriate.


- Rohini Palaniswamy


On Dec. 24, 2013, 1:34 a.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Dec. 24, 2013, 1:34 a.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> Race condition:
> -I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
> -I also made a small cleanup to PhysicalOperator.java.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 8487a0f 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java d57aded 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 0ee7256 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 191563d 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java df9fea6 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 862e637 
>   test/e2e/pig/tests/tez.conf 0e4ba4e 
>   test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld PRE-CREATION 
>   test/org/apache/pig/tez/TestTezCompiler.java 8d5e5f2 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added a new unit test e2e test to tez.conf with session reuse enabled
> Ported three other e2e tests from streaming.conf to tez.conf to increase coverage
> 
> ant test-tez passes
> ant test-e2e-tez passes
> Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
> 
> 
> Thanks,
> 
> Alex Bain
> 
>


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Cheolsoo Park <pi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review30848
-----------------------------------------------------------

Ship it!


Looks good to me. I will commit it after running tests.

Let me also fix my minor comments below when committing the patch as well as whitespaces. Thank you Alex!


src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
<https://reviews.apache.org/r/16309/#comment59052>

    Let's use Boolean.valueOf() instead of String.equals() here.



src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java
<https://reviews.apache.org/r/16309/#comment59051>

    Apache header is missing.


- Cheolsoo Park


On Dec. 24, 2013, 1:34 a.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Dec. 24, 2013, 1:34 a.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> Race condition:
> -I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
> -I also made a small cleanup to PhysicalOperator.java.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 8487a0f 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java d57aded 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 0ee7256 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 191563d 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java df9fea6 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 862e637 
>   test/e2e/pig/tests/tez.conf 0e4ba4e 
>   test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld PRE-CREATION 
>   test/org/apache/pig/tez/TestTezCompiler.java 8d5e5f2 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added a new unit test e2e test to tez.conf with session reuse enabled
> Ported three other e2e tests from streaming.conf to tez.conf to increase coverage
> 
> ant test-tez passes
> ant test-e2e-tez passes
> Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
> 
> 
> Thanks,
> 
> Alex Bain
> 
>


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Cheolsoo Park <pi...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review30849
-----------------------------------------------------------



test/e2e/pig/tests/tez.conf
<https://reviews.apache.org/r/16309/#comment59053>

    I believe you're overwriting join test cases by mistake. One more below.


- Cheolsoo Park


On Dec. 24, 2013, 1:34 a.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Dec. 24, 2013, 1:34 a.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> Race condition:
> -I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
> -I also made a small cleanup to PhysicalOperator.java.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
>   src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 8487a0f 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java d57aded 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 0ee7256 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 191563d 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java df9fea6 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 862e637 
>   test/e2e/pig/tests/tez.conf 0e4ba4e 
>   test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld PRE-CREATION 
>   test/org/apache/pig/tez/TestTezCompiler.java 8d5e5f2 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added a new unit test e2e test to tez.conf with session reuse enabled
> Ported three other e2e tests from streaming.conf to tez.conf to increase coverage
> 
> ant test-tez passes
> ant test-e2e-tez passes
> Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass
> 
> 
> Thanks,
> 
> Alex Bain
> 
>


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Alex Bain <am...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/
-----------------------------------------------------------

(Updated Dec. 23, 2013, 5:34 p.m.)


Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.


Changes
-------

Updated with fix to race condition and e2e tests


Bugs: PIG-3629
    https://issues.apache.org/jira/browse/PIG-3629


Repository: pig-git


Description (updated)
-------

Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629

In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU

Basic Changes:
-Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
-Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
-Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.

Resource Manager Changes:
-TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
-CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.

Race condition:
-I found a race condition that resulted from reusing the Result object in POSimpleTezLoad. There are several possible solutions. After discussing in the newsgroup, we decided to change POSimpleTezLoad for now.
-I also made a small cleanup to PhysicalOperator.java.


Diffs (updated)
-----

  src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
  src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java 8487a0f 
  src/org/apache/pig/backend/hadoop/executionengine/tez/POSimpleTezLoad.java d57aded 
  src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 0ee7256 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 191563d 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java df9fea6 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
  src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
  src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 862e637 
  test/e2e/pig/tests/tez.conf 0e4ba4e 
  test/org/apache/pig/test/data/GoldenFiles/TEZC12.gld PRE-CREATION 
  test/org/apache/pig/tez/TestTezCompiler.java 8d5e5f2 

Diff: https://reviews.apache.org/r/16309/diff/


Testing (updated)
-------

Added a unit test to TestTezCompiler.java
Added a new unit test e2e test to tez.conf with session reuse enabled
Ported three other e2e tests from streaming.conf to tez.conf to increase coverage

ant test-tez passes
ant test-e2e-tez passes
Manually tested with a large subset of tests from streaming.conf (the ones using features currently supported by Pig-on-Tez), they pass


Thanks,

Alex Bain


Re: Review Request 16309: PIG-3629 Implement STREAM operator in Tez

Posted by Daniel Dai <da...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16309/#review30644
-----------------------------------------------------------


Looks good. Can you add a few e2e tests to it?

- Daniel Dai


On Dec. 18, 2013, 3:53 a.m., Alex Bain wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16309/
> -----------------------------------------------------------
> 
> (Updated Dec. 18, 2013, 3:53 a.m.)
> 
> 
> Review request for pig, Cheolsoo Park, Daniel Dai, Mark Wagner, and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-3629
>     https://issues.apache.org/jira/browse/PIG-3629
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Implement STREAM operator in Tez - https://issues.apache.org/jira/browse/PIG-3629
> 
> In this patch, I do not add resources to pig-misc.jar, I just add them individually. See my discussion post: https://groups.google.com/forum/#!topic/pig-on-tez/8S80GMKhMaU
> 
> Basic Changes:
> -Run the PhyPlanSetter and EndOfAllInputSetter to set the parent plan and the end-of-all input flags necessary for STREAM, just like in MR Pig.
> -Add a map to hold plan-specific extra local resources in TezOperPlan.java. These resources can either come from the user's directory (e.g. SHIP('/home/abain/foo')) or from HDFS (e.g. CACHE('/user/abain/bar') in HDFS).
> -Add the new class TezPOStreamVisitor that assembles all the plan-specific local resources that get added in TezOperPlan.java.
> 
> Resource Manager Changes:
> -TezResourcManager resources were previously a map of java.net.URL -> Path in HDFS. Previously, the URL's were all local files, e.g. file://home/abain/pig-withouthHadoop.jar. However, the CACHE statement requires that resources already present in HDFS be able to be added as local resources. Unfortunately java.net.URL does not support hdfs:// URL's, so I changed this data structure to be a YARN URL instead. I also added methods to the ResourceManager to distinguish whether you are adding a local resource or a resource already present in HDFS.
> -CACHE also supports URL's with fragments at the end, which become a "shortcut" to the name, e.g. CACHE(/input/big-data-name.gz#data.gz). I changed the resource manager to look for a fragments and use that as the resource name (if the fragment exist). This results in the symlink to the resource being created with the fragment name, which is what we want.
> 
> 
> Diffs
> -----
> 
>   src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java 37566ab 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java 7a1736a 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java 2584501 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java 96ccdde 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobControlCompiler.java 135b933 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java 0cc8e17 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOStreamVisitor.java PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezPlanContainer.java 673fd70 
>   src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java 0fd7575 
> 
> Diff: https://reviews.apache.org/r/16309/diff/
> 
> 
> Testing
> -------
> 
> Added a unit test to TestTezCompiler.java
> Added an e2e test to tez.conf
> 
> ant test-tez passes
> ant test-e2e-tez has three failures - I am investigating to see if they are releated, or perhaps just transient
> 
> Question: There is already a separate suite of STREAM tests in streaming.conf. Maybe I should remove my e2e test and we should add streaming.conf as a dependency to the test-e2e-tez target? I haven't tried to run streaming.conf yet.
> 
> 
> Thanks,
> 
> Alex Bain
> 
>