You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [5/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml Fri Feb 24 08:19:42 2017
@@ -713,7 +713,7 @@ This will cause an error …</source>
    <p></p>
    <ul>
       <li>
-         <p>If Pig cannot resolve incompatible types through implicit casts, an error will occur. For example, you cannot add chararray and float (see the Types Table for addition and subtraction).</p>
+         <p>If Pig cannot resolve incompatible types through implicit casts, an error will occur. For example, you cannot add chararray and float (see the <a href="#types-table-add">Types Table for addition and subtraction</a>).</p>
       <source>
 A = LOAD 'data' AS (name:chararray, age:int, gpa:float);
 B = FOREACH A GENERATE name + gpa;
@@ -1335,6 +1335,26 @@ dump X;
          </tr>
          <tr>
             <td>
+               <p>biginteger</p>
+            </td>
+            <td>
+               <p>19211921192119211921BI</p>
+            </td>
+            <td>
+            </td>
+         </tr>
+         <tr>
+            <td>
+               <p>bigdecimal</p>
+            </td>
+            <td>
+               <p>192119211921.192119211921BD</p>
+            </td>
+            <td>
+            </td>
+         </tr>
+         <tr>
+            <td>
                <p><strong>Complex Data Types</strong></p>
             </td>
             <td>
@@ -1388,7 +1408,18 @@ dump X;
          <p>To specify a long constant, l or L must be appended to the number (for example, 12345678L). If the l or L is not specified, but the number is too large to fit into an int, the problem will be detected at parse time and the processing is terminated. </p>
       </li>
       <li>
-         <p>Any numeric constant with decimal point (for example, 1.5) and/or exponent (for example, 5e+1) is treated as double unless it ends with f or F in which case it is assigned type float (for example,  1.5f). </p>
+         <p>Any numeric constant with decimal point (for example, 1.5) and/or exponent (for example, 5e+1) is treated as double unless it ends with the following characters:</p>
+         <ul>
+            <li>
+               <p>f or F in which case it is assigned type float (for example,  1.5f)</p>
+            </li>
+            <li>
+               <p>BD or bd in which case it is assigned type BigDecimal (for example,  12345678.12345678BD)</p>
+            </li>
+         </ul>
+      </li>
+      <li>
+         <p>BigIntegers can be specified by supplying BI or bi at the end of the number (for example, 123456789123456BI)</p>
       </li>
       <li>
          <p>There is no native constant type for datetime field. You can use a ToDate udf with chararray constant as argument to generate a datetime value. </p>
@@ -5394,8 +5425,8 @@ D = foreach C generate y; -- which y?
    <section  id="flatten">
    <title>Flatten Operator</title>
    <p>The FLATTEN operator looks like a UDF syntactically, but it is actually an operator that changes the structure of tuples 
-   and bags in a way that a UDF cannot. Flatten un-nests tuples as well as bags. The idea is the same, but the operation and 
-   result is different for each type of structure.</p>
+   and bags in a way that a UDF cannot. Flatten un-nests tuples, bags and maps. The idea is the
+      same, but the operation and result is different for each type of structure.</p>
 
    <p>For tuples, flatten substitutes the fields of a tuple in place of the tuple. For example, consider a relation that has a tuple 
    of the form (a, (b, c)). The expression GENERATE $0, flatten($1), will cause that tuple to become (a, b, c).</p>
@@ -5405,6 +5436,14 @@ D = foreach C generate y; -- which y?
    tuples (b,c) and (d,e). When we remove a level of nesting in a bag, sometimes we cause a cross product to happen. 
    For example, consider a relation that has a tuple of the form (a, {(b,c), (d,e)}), commonly produced by the GROUP operator. 
    If we apply the expression GENERATE $0, flatten($1) to this tuple, we will create new tuples: (a, b, c) and (a, d, e).</p>
+
+   <p>For maps, flatten creates a tuple with two fields containing the key and value.
+      If we have a map field named kvpair with input as (m[k1#v1, k2#v2]) and we apply GENERATE flatten(kvpair),
+      it will generate two tuples (k1,v1) and (k2,v2) which can be accessed as kvpair::key and
+      kvpair::value.<br/>When there are additional projections in the expression, a cross product will happen similar
+      to bags. For example, if we apply the expression GENERATE $0, FLATTEN($1) to the input tuple (a, m[k1#1, k2#2, k3#3]),
+      we will see (a,k1,1), (a,k2,2) and (a,k3,3) as the result.
+   </p>
    
    <p>Also note that the flatten of empty bag will result in that row being discarded; no output is generated. 
    (See also <a href="perf.html#nulls">Drop Nulls Before a Join</a>.) </p>
@@ -6488,6 +6527,16 @@ D = FOREACH C GENERATE flatten(A), flatt
 E = GROUP D BY A::x;
 ……
 </source>
+
+   <p>A FLATTEN example on a map type. Here we load an integer and map (of integer values) into A. Then m gets
+      flattened, and finally we are filtering the result to only include tuples where the value among the un-nested
+      map entries was 5.</p>
+<source>
+A = LOAD 'data' AS (a:int, m:map[int]);
+B = FOREACH A GENERATE a, FLATTEN(m);
+C = FILTER B by m::value == 5;
+……
+</source>
    
    </section>
    
@@ -6924,7 +6973,7 @@ public class SimpleCustomPartitioner ext
    <table>
       <tr> 
             <td>
-               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               <p>alias = JOIN alias BY {expression|'('expression [, expression …]')'} (, alias BY {expression|'('expression [, expression …]')'} …) [USING 'replicated' | 'bloom' | 'skewed' | 'merge' | 'merge-sparse'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table></section>
@@ -6973,7 +7022,16 @@ public class SimpleCustomPartitioner ext
                <p>Use to perform replicated joins (see <a href="perf.html#replicated-joins">Replicated Joins</a>).</p>
             </td>
          </tr>
-         
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+            </td>
+         </tr>
+
          <tr>
             <td>
                <p>'skewed'</p>
@@ -7111,7 +7169,7 @@ DUMP X;
       <tr> 
             <td>
                <p>alias = JOIN left-alias BY left-alias-column [LEFT|RIGHT|FULL] [OUTER], right-alias BY right-alias-column 
-               [USING 'replicated' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
+               [USING 'replicated' | 'bloom' | 'skewed' | 'merge'] [PARTITION BY partitioner] [PARALLEL n];  </p>
             </td>
          </tr> 
    </table>
@@ -7182,7 +7240,7 @@ DUMP X;
             </td>
          </tr>
 
-  <tr>
+         <tr>
             <td>
                <p>USING</p>
             </td>
@@ -7199,8 +7257,18 @@ DUMP X;
                <p>Only left outer join is supported for replicated joins.</p>
             </td>
          </tr>
-         
-                  <tr>
+
+         <tr>
+            <td>
+               <p>'bloom'</p>
+            </td>
+            <td>
+               <p>Use to perform bloom joins (see <a href="perf.html#bloom-joins">Bloom Joins</a>).</p>
+               <p>Full outer join is not supported for bloom joins.</p>
+            </td>
+         </tr>
+
+         <tr>
             <td>
                <p>'skewed'</p>
             </td>
@@ -7293,6 +7361,13 @@ B = LOAD 'tiny';
 C= JOIN A BY $0 LEFT, B BY $0 USING 'replicated';
 </source>
 
+<p>This example shows a bloom right outer join.</p>
+<source>
+A = LOAD 'large';
+B = LOAD 'small';
+C= JOIN A BY $0 RIGHT, B BY $0 USING 'bloom';
+</source>
+
 <p>This example shows a skewed full outer join.</p>
 <source>
 A = LOAD 'studenttab' as (name, age, gpa);
@@ -7981,10 +8056,10 @@ X = SAMPLE A 0.01;
 </source>
 <p>In this example, a scalar expression is used (it will sample approximately 1000 records from the input).</p>
 <source>
-a = load 'a.txt';
-b = group a all;
-c = foreach b generate COUNT(a) as num_rows;
-e = sample a 1000/c.num_rows;
+a = LOAD 'a.txt';
+b = GROUP a ALL;
+c = FOREACH b GENERATE COUNT_STAR(a) AS num_rows;
+d = SAMPLE a (double)1000/c.num_rows;
 </source>
    </section></section>  
    
@@ -8407,7 +8482,7 @@ X = STREAM A THROUGH `stream.pl` as (f1:
    <table>
       <tr> 
             <td>
-               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …];</p>
+               <p>alias = UNION [ONSCHEMA] alias, alias [, alias …] [PARALLEL n];</p>
             </td>
          </tr> 
    </table></section>
@@ -8434,6 +8509,16 @@ X = STREAM A THROUGH `stream.pl` as (f1:
                All inputs to the union must have a non-unknown (non-null) <a href="#schemas">schema</a>.</p>
             </td>
          </tr>
+         
+     <tr>
+        <td>
+           <p>PARALLEL n</p>
+        </td>
+        <td>
+           <p>This is only applicable for Tez execution mode and will not work with Mapreduce mode. Specifying PARALLEL will introduce an extra reduce step that will slightly degrade performance. The primary purpose in this case is to control the number of output files.</p>
+           <p>For more information, see <a href="perf.html#parallel">Use the Parallel Features</a>.</p>
+        </td>
+     </tr>
    </table>
    </section>
    
@@ -8462,11 +8547,11 @@ B: (b1:long, b2:long, b3:long)
 A union B: null 
 </source>
   
-<p>Union columns with incompatible types result in a bytearray type: </p>
+<p>Union columns with incompatible types results in a failure. (See <a href="#types-table-add">Types Table for addition and subtraction</a> for incompatible types.)</p>
 <source>
-A: (a1:long, a2:long) 
-B: (b1:(b11:long, b12:long), b2:long) 
-A union B: (a1:bytearray, a2:long) 
+A: (a1:long)
+B: (a1:chararray)
+A union B: ERROR: Cannot cast from long to bytearray
 </source>
 
 <p>Union columns of compatible type will produce an "escalate" type. 
@@ -8730,8 +8815,8 @@ DUMP U;
    </ul>
    
    
-   <section>
-   <title>About Input and Output</title>
+   <section id="pig-streaming-input-output">
+   <title>About Input and Output for Streaming</title>
    <p>Serialization is needed to convert data from tuples to a format that can be processed by the streaming application. Deserialization is needed to convert the output from the streaming application back into tuples. PigStreaming is the default serialization/deserialization function.</p>
    
 <p>Streaming uses the same default format as PigStorage to serialize/deserialize the data. If you want to explicitly specify a format, you can do it as show below (see more examples in the Examples: Input/Output section).  </p> 
@@ -8762,7 +8847,7 @@ interface StreamToPig {
     public Tuple deserialize(byte[]) throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the back
+     * This will be called on both the front end and the back
      * end during execution.
      *
      * @return the {@link LoadCaster} associated with this object.

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/cont.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/cont.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/cont.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/cont.xml Fri Feb 24 08:19:42 2017
@@ -1348,7 +1348,7 @@ IMPORT 'my_macro.pig';
                <p>Preprocessor statement included in a Pig script.</p>
                <p>Use to describe one parameter in terms of other parameters.</p>
                <p>The declare statement is processed prior to running the Pig script. </p>
-               <p>The scope of a parameter value defined using declare is all the lines following the declare statement until the next declare statement that defines the same parameter is encountered.</p>
+               <p>The scope of a parameter value defined using declare is all the lines following the declare statement until the next declare statement that defines the same parameter is encountered. When used with run/exec command, see <a href="#Parameter-Sub-scope">Scope section</a>.</p>
             </td>
          </tr>
          <tr>
@@ -1388,7 +1388,7 @@ IMPORT 'my_macro.pig';
       </li>
    </ul>
    <p></p>
-   <p>Parameter substitution may be used inside of macros, but it is the responsibility of the user to ensure that there are no conflicts between names of parameters defined at the top level and names of arguments or return values for a macro. A simple way to ensure this is to use ALL_CAPS for top-level parameters and lower_case for macro-level parameters. See <a href="#define-macros">DEFINE (macros)</a>.</p>
+   <p>Parameter substitution may be used inside of macros. When there are conflicts between names of parameters defined at the top level and names of arguments or return values for a given macro, then ones inside the macro are used. See <a href="#define-macros">DEFINE (macros)</a>.</p>
    </section>
    
    <section>
@@ -1424,6 +1424,12 @@ IMPORT 'my_macro.pig';
          <p>Declare and default preprocessors statements are processed in the order they appear in the Pig script. </p>
       </li>
    </ul>
+   </section>
+
+   <section id="Parameter-Sub-scope">
+   <title>Scope</title>
+   <p>Scope of the parameters is global except when used with run/exec command. Caller would not see the parameters declared within the callee's scripts. See <a href='#Parameter-Sub-scope-example'>example</a> for more details.</p>
+
    </section></section>
   
   
@@ -1532,6 +1538,28 @@ B = FILTER A BY $0>'5';
 <em>etc ... </em>
 </source>
    </section>
+   <section id="Parameter-Sub-scope-example">
+   <title>Scoping with run/exec commands </title>
+   <p> In this example, parameters passed to run/exec command or declared within the called scripts are not visible to the caller.</p>
+<source>
+/* main.pig */
+run -param var1=10 script1.pig
+exec script2.pig
+
+A = ...
+B = FOREACH A generate $var1, $var2, ...  --ERROR. unknown parameters var1, var2
+
+</source>
+<source>
+/* script1.pig */
+...
+</source>
+<source>
+/* script2.pig */
+%declare var2 20
+...
+</source>
+   </section>
    </section>
    </section>
 

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Fri Feb 24 08:19:42 2017
@@ -294,6 +294,75 @@ team_parkyearslist = FOREACH (GROUP team
   </section>
 </section>
 
+<section id="bagtotuple">
+  <title>BagToTuple</title>
+  <p>Un-nests the elements of a bag into a tuple.</p>
+
+  <section>
+    <title>Syntax</title>
+    <table>
+      <tr>
+        <td>
+          <p>BagToTuple(expression)</p>
+        </td>
+      </tr>
+  </table></section>
+
+  <section>
+    <title>Terms</title>
+    <table>
+      <tr>
+        <td>
+         <p>expression</p>
+        </td>
+        <td>
+         <p>An expression with data type bag.</p>
+        </td>
+      </tr> 
+    </table>
+  </section>
+
+  <section>
+    <title>Usage</title>
+    <p>BagToTuple creates a tuple from the elements of a bag. It removes only
+      the first level of nesting; it does not recursively un-nest nested bags.
+      Unlike FLATTEN, BagToTuple will not generate multiple output records per
+      input record.
+    </p>
+  </section>
+  <section>
+    <title>Examples</title>
+    <p>In this example, a bag containing tuples with one field is converted to a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:chararray)});
+
+DUMP A;
+({('a'),('b'),('c')})
+({('d'),('e'),('f')})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+(('a','b','c'))
+(('d','e','f'))
+</source>
+    <p>In this example, a bag containing tuples with two fields is converted to a tuple.</p>
+<source>
+A = LOAD 'bag_data' AS (B1:bag{T1:tuple(f1:int,f2:int)});
+
+DUMP A;
+({(4,1),(7,8),(4,9)})
+({(5,8),(4,3),(3,8)})
+
+X = FOREACH A GENERATE BagToTuple(B1);
+
+DUMP X;
+((4,1,7,8,4,9))
+((5,8,4,3,3,8))
+</source>
+  </section>
+</section>
+
 <section id="bloom">
   <title>Bloom</title>
   <p>Bloom filters are a common way to select a limited set of records before
@@ -1377,7 +1446,80 @@ DUMP X;
          </tr> 
    </table>
    </section></section>
-   
+  
+
+<!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="in">
+ <title>IN</title>
+ <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to reduce the need for multiple OR conditions.</p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+     <tr>
+          <td>
+             <p>IN (expression)</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Terms</title>
+ <table>
+     <tr>
+          <td>
+             <p>expression</p>
+          </td>
+          <td>
+             <p>An expression with data types chararray, int, long, float, double, bigdecimal, biginteger or bytearray.</p>
+          </td>
+       </tr>
+ </table></section>
+
+ <section>
+ <title>Usage</title>
+ <p>IN operator allows you to easily test if an expression matches any value in a list of values. It is used to help reduce the need for multiple OR conditions.</p>
+ </section>
+
+ <section>
+ <title>Example</title>
+ <p>In this example we filter out ID 4 and 6.</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:int, first:chararray, last:chararray, gender:chararray);
+
+DUMP A;
+(1,Christine,Romero,Female)
+(2,Sara,Hansen,Female)
+(3,Albert,Rogers,Male)
+(4,Kimberly,Morrison,Female)
+(5,Eugene,Baker,Male)
+(6,Ann,Alexander,Female)
+(7,Kathleen,Reed,Female)
+(8,Todd,Scott,Male)
+(9,Sharon,Mccoy,Female)
+(10,Evelyn,Rice,Female)
+
+X = FILTER A BY id IN (4, 6);
+DUMP X;
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+</source>
+ </section>
+
+<p>In this example, we're passing a BigInteger and using NOT operator, thereby negating the passed list of fields in the IN clause</p>
+<source>
+A = load 'data' using PigStorage(',') AS (id:biginteger, first:chararray, last:chararray, gender:chararray); 
+X = FILTER A BY NOT id IN (1, 3, 5, 7, 9); 
+DUMP X;
+ 
+(2,Sara,Hansen,Female)
+(4,Kimberly,Morrison,Female)
+(6,Ann,Alexander,Female)
+(8,Todd,Scott,Male)
+(10,Evelyn,Rice,Female)
+</source>
+</section>
+
      <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
    <section id="tokenize">
    <title>TOKENIZE</title>
@@ -1442,7 +1584,7 @@ DUMP X;
 <source>
 {code}
 A = LOAD 'data' AS (f1:chararray);
-B = FOREACH A TOKENIZE (f1,'||');
+B = FOREACH A GENERATE TOKENIZE (f1,'||');
 DUMP B;
 {code} 
 </source>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml Fri Feb 24 08:19:42 2017
@@ -42,7 +42,7 @@
   </section>
   <section id="container-reuse">
     <title>Tez session/container reuse</title>
-    <p>One downside of MapReduce is the startup cost for a job is very high. That hurts the performance especially for small job. Tez alleviate the problem by using session and container reuse, so it is not necessary to start an application master for every job, and start a JVM for every task. By default, session/container reuse is on and we usually shall not turn it off. JVM reuse might cause some side effect if static variable is used since static variable might live across different jobs. So if static variable is used in EvalFunc/LoadFunc/StoreFunc, be sure to implement a cleanup function and register with <a href="http://pig.apache.org/docs/r0.14.0/api/org/apache/pig/JVMReuseManager.html">JVMReuseManager</a>.</p>
+    <p>One downside of MapReduce is the startup cost for a job is very high. That hurts the performance especially for small job. Tez alleviate the problem by using session and container reuse, so it is not necessary to start an application master for every job, and start a JVM for every task. By default, session/container reuse is on and we usually shall not turn it off. JVM reuse might cause some side effect if static variable is used since static variable might live across different jobs. So if static variable is used in EvalFunc/LoadFunc/StoreFunc, be sure to implement a cleanup function and register with <a href="http://pig.apache.org/docs/r0.16.0/api/org/apache/pig/JVMReuseManager.html">JVMReuseManager</a>.</p>
   </section>
   <section id="auto-parallelism">
     <title>Automatic parallelism</title>
@@ -957,6 +957,9 @@ You can include the PARALLEL clause with
 <a href="basic.html#join-inner">JOIN (inner)</a>, 
 <a href="basic.html#join-outer">JOIN (outer)</a>, and
 <a href="basic.html#order-by">ORDER BY</a>.
+  PARALLEL clause can also be used with <a href="basic.html#union">UNION</a> if Tez is the execution mode.
+  It will turn off the union optimization and introduce an extra reduce step.
+  Though it will have slightly degraded performance due to the extra step, it is very useful for controlling the number of output files.
 </p>
 
 <p>The number of reducers you need for a particular construct in Pig that forms a MapReduce boundary depends entirely on (1) your data and the number of intermediate keys you are generating in your mappers and (2) the partitioner and distribution of map (combiner) output keys. In the best cases we have seen that a reducer processing about 1 GB of data behaves efficiently.</p>
@@ -1199,6 +1202,100 @@ gets 1 GB of memory. Please share your o
 </section>
 <!-- END FRAGMENT REPLICATE JOINS-->
 
+<!-- BLOOM JOINS-->
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="bloom-joins">
+<title>Bloom Joins</title>
+<p>Bloom join is a special type of join where a bloom filter is constructed using join keys of one relation and
+used to filter records of the other relations before doing a regular hash join.
+The amount of data sent to the reducers will be a lot less depending up on the numbers of records that are filtered on the map side.
+Bloom join is very useful in cases where the number of matching records between relations in a join are comparatively less
+compared to the total records allowing many to be filtered before the join.
+Before bloom join was added as a type of join, same functionality was achieved by users by using
+the <a href="func.html#bloom">builtin bloom udfs</a> which is not as efficient and required more lines of code as well.
+Currently bloom join is only implemented in Tez execution mode. Builtin bloom udfs have to be used for other execution modes.</p>
+
+<section>
+<title>Usage</title>
+<p>Perform a bloom join with the USING clause (see <a href="basic.html#join-inner">JOIN (inner)</a> and <a href="basic.html#join-outer">JOIN (outer)</a>).
+In this example, a large relation is joined with two smaller relations. Note that the large relation comes first followed by the smaller relations.
+Bloom filter is built from join keys of the right most relation which is small and the filter is applied on the big and medium relations.
+None of the relations are required to fit into main memory. </p>
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+medium = LOAD 'medium_data' AS (m1,m2,m3);
+
+small = LOAD 'small_data' AS (s1,s2,s3);
+
+C = JOIN big BY b1, medium BY m1, small BY s1 USING 'bloom';
+</source>
+
+<p>
+In the case of inner join and right outer join, the right most relation is used for building the bloom filter
+and the users are expected to specify the smaller dataset as the right most relation.
+But in the case of left outer join, the left most relation is used for building the bloom filter and is expected to be the smaller dataset.
+This is because all records of the outer relation should be in the result and no records can be filtered.
+If the left relation turns out to be the bigger dataset, it would not be as efficient to build the bloom filter on the bigger dataset.
+But it might still perform better than a regular join if it is able to filter lot of records from the right relation.
+</p>
+
+<source>
+big = LOAD 'big_data' AS (b1,b2,b3);
+
+small = LOAD 'small_data' AS (m1,m2,m3);
+
+C = JOIN small BY s1 LEFT, big BY b1 USING 'bloom';
+</source>
+</section>
+
+<section>
+<title>Conditions</title>
+<ul>
+<li>Bloom join cannot be used with a FULL OUTER join.</li>
+<li>If the the underlying data is sufficiently skewed, bloom join might not help. Skewed join can be considered for those cases.</li>
+</ul>
+</section>
+
+<section>
+<title>Tuning options</title>
+<p>
+There are multiple <a href="start.html#properties">pig properties</a> than can be configured to construct a more efficient bloom filter.
+See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for a discussion of how to select the number of bits and the number of hash functions.
+Easier option would be to search for "bloom filter calculator" in a search engine and use one of the online bloom filter calculators available to arrive at the desired values.
+</p>
+<ul>
+<li>pig.bloomjoin.strategy - The valid values for this are 'map' and 'reduce'. Default value is map.
+Bloom join has two different kind of implementations to be more efficient in different cases.
+In general, there is an extra reduce step in the DAG for construction of the bloom filter(s).
+<ul>
+<li>map - In each map, bloom filters are computed on the join keys partitioned by the hashcode of the key
+with pig.bloomjoin.num.filters number of partitions.
+Bloom filters for each partition from different maps are then combined in the reducers producing one bloom filter per partition.
+The default value of pig.bloomjoin.num.filters is 1 for this strategy and so usually only one bloom filter is created.
+This is efficient and fast if there are smaller number of maps (&lt;10) and the number of distinct keys are not too high.
+It can be faster with larger number of maps and even with bigger bloom vector sizes,
+ but the amount of data shuffled to the reducer for aggregation becomes huge making it inefficient.</li>
+<li>reduce - Join keys are sent from the map to the reducer partitioned by hashcode of the key with
+pig.bloomjoin.num.filters number of partitions. In the reducers, one bloom filter is then computed per partition.
+Number of reducers are set equal to the number of partitions allowing for each bloom filter to be computed in parallel.
+The default value of pig.bloomjoin.num.filters is 11 for this strategy.
+This is efficient for larger datasets with lot of maps or very large bloom vector size.
+In this case size of keys sent to the reducer is smaller than sending bloom filters to reducer for aggregation making it efficient.</li>
+</ul>
+</li>
+<li>pig.bloomjoin.num.filters - The number of bloom filters that will be created. Default is 1 for map strategy and 11 for reduce strategy.</li>
+<li>pig.bloomjoin.vectorsize.bytes - The size in bytes of the bit vector to be used for the bloom filter.
+A bigger vector size will be needed when the number of distinct keys is higher. Default value is 1048576 (1MB).</li>
+<li>pig.bloomjoin.hash.functions - The type of hash function to use. Valid values are 'jenkins' and 'murmur'. Default is murmur.</li>
+<li>pig.bloomjoin.hash.types - The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+Higher the number lower the false positives. Too high a value can increase the cpu time. Default value is 3.</li>
+</ul>
+</section>
+
+</section>
+<!-- END BLOOM JOINS-->
+
 <!-- +++++++++++++++++++++++++++++++ -->
 <!-- SKEWED JOINS-->
 <section id="skewed-joins">

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml Fri Feb 24 08:19:42 2017
@@ -137,12 +137,15 @@
 <br></br>&nbsp;&nbsp;&nbsp; <a href="func.html#tobag">and TOBAG function</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="basic.html#type-construction">and type construction operators</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="func.html#bagtostring">converting to string</a>
+<br></br>&nbsp;&nbsp;&nbsp; <a href="func.html#bagtotuple">converting to tuple</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="basic.html#schema-multi">schemas for multiple types</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="basic.html#bag">syntax</a>
 </p>
 
 <p><a href="func.html#bagtostring">BagToString</a> function</p>
 
+<p><a href="func.html#bagtotuple">BagToTuple</a> function</p>
+
 <p><a href="start.html#batch-mode">batch mode</a>. <em>See also</em> memory management</p>
 
 <p><a href="basic.html#arithmetic">bincond operator</a> ( ?: )</p>
@@ -401,7 +404,6 @@
 <p>Hadoop
 <br></br>&nbsp;&nbsp;&nbsp; <a href="cmds.html#fs">FsShell commands</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="basic.html#load-glob">Hadoop globbing</a>
-<br></br>&nbsp;&nbsp;&nbsp; <a href="test.html#hadoop-job-history-loader">HadoopJobHistoryLoader</a>
 <br></br>&nbsp;&nbsp;&nbsp; hadoop partitioner. <em>See</em> PARTITION BY
 <br></br>&nbsp;&nbsp;&nbsp; <a href="start.html#hadoop-properties">Hadoop properties</a>
 <br></br>&nbsp;&nbsp;&nbsp; <a href="start.html#req">versions supported</a>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/site.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/site.xml Fri Feb 24 08:19:42 2017
@@ -48,6 +48,7 @@ See http://forrest.apache.org/docs/linki
     <cmds label="Shell and Utililty Commands" href="cmds.html" />
     <perform label="Performance and Efficiency" href="perf.html" />
     <test label="Testing and Diagnostics" href="test.html" />
+    <v_editors label="Visual Editors" href="v_editors.html"/>
     <admin label="Administration" href="admin.html" />
     <index label="Index" href="pig-index.html" />
     </docs>  

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri Feb 24 08:19:42 2017
@@ -34,7 +34,7 @@
  <p><strong>Mandatory</strong></p>
       <p>Unix and Windows users need the following:</p>
 		<ul>
-		  <li> <strong>Hadoop 0.23.X, 1.X or 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 1.0.4.)</li>
+		  <li> <strong>Hadoop 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 2.7.3.)</li>
 		  <li> <strong>Java 1.7</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li>	
 		</ul>
 		<p></p>
@@ -82,7 +82,6 @@ Test the Pig installation with this simp
 	  <li> Build the code from the top directory: <code>ant</code> <br></br>
 	  If the build is successful, you should see the pig.jar file created in that directory. </li>	
 	  <li> Validate the pig.jar  by running a unit test: <code>ant test</code></li>
-	  <li> If you are using Hadoop 0.23.X or 2.X, please add -Dhadoopversion=23 in your ant command line in the previous steps</li>
      </ol>
  </section>
 </section>
@@ -558,16 +557,16 @@ However, in a production environment you
 <li>Make sure the JAVA_HOME environment variable is set the root of your Java installation.</li>
 <li>Make sure your PATH includes bin/pig (this enables you to run the tutorials using the "pig" command). 
 <source>
-$ export PATH=/&lt;my-path-to-pig&gt;/pig-0.14.0/bin:$PATH 
+$ export PATH=/&lt;my-path-to-pig&gt;/pig-0.16.0/bin:$PATH 
 </source>
 </li>
 <li>Set the PIG_HOME environment variable:
 <source>
-$ export PIG_HOME=/&lt;my-path-to-pig&gt;/pig-0.14.0 
+$ export PIG_HOME=/&lt;my-path-to-pig&gt;/pig-0.16.0 
 </source></li>
 <li>Create the pigtutorial.tar.gz file:
 <ul>
-    <li>Move to the Pig tutorial directory (.../pig-0.14.0/tutorial).</li>
+    <li>Move to the Pig tutorial directory (.../pig-0.16.0/tutorial).</li>
 	<li>Run the "ant" command from the tutorial directory. This will create the pigtutorial.tar.gz file.
 	</li>
 </ul>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml Fri Feb 24 08:19:42 2017
@@ -32,6 +32,6 @@
   -->
   <tab label="Project" href="http://hadoop.apache.org/pig/" type="visible" /> 
   <tab label="Wiki" href="http://wiki.apache.org/pig/" type="visible" /> 
-  <tab label="Pig 0.16.0 Documentation" dir="" type="visible" /> 
+  <tab label="Pig 0.17.0 Documentation" dir="" type="visible" />
 
 </tabs>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml Fri Feb 24 08:19:42 2017
@@ -540,7 +540,7 @@ job_201004271216_12714 1 1 3 3 3 12 12 1
 
 <p>Pig Statistics is a framework for collecting and storing script-level statistics for Pig Latin. Characteristics of Pig Latin scripts and the resulting MapReduce jobs are collected while the script is executed. These statistics are then available for Pig users and tools using Pig (such as Oozie) to retrieve after the job is done.</p>
 
-<p>The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file). Piggybank has a HadoopJobHistoryLoader which acts as an example of using Pig itself to query these statistics (the loader can be used as a reference implementation but is NOT supported for production use).</p>
+<p>The new Pig statistics and the existing Hadoop statistics can also be accessed via the Hadoop job history file (and job xml file).</p>
 
 <!-- +++++++++++++++++++++++++++++++++++++++ -->
 <section>
@@ -548,7 +548,7 @@ job_201004271216_12714 1 1 3 3 3 12 12 1
 
 <p>Several new public classes make it easier for external tools such as Oozie to integrate with Pig statistics. </p>
 
-<p>The Pig statistics are available here: <a href="http://pig.apache.org/docs/r0.14.0/api/">http://pig.apache.org/docs/r0.14.0/api/</a></p>
+<p>The Pig statistics are available here: <a href="http://pig.apache.org/docs/r0.16.0/api/">http://pig.apache.org/docs/r0.16.0/api/</a></p>
 
 <p id="stats-classes">The stats classes are in the package: org.apache.pig.tools.pigstats</p>
 <ul>
@@ -708,93 +708,6 @@ public interface PigProgressNotification
 </tr>
 </table>
 </section>
-
-
-<!-- +++++++++++++++++++++++++++++++++++++++ -->
-<section id="hadoop-job-history-loader">
-<title>Hadoop Job History Loader</title>
-<p>The HadoopJobHistoryLoader in Piggybank loads Hadoop job history files and job xml files from file system. For each MapReduce job, the loader produces a tuple with schema (j:map[], m:map[], r:map[]). The first map in the schema contains job-related entries. Here are some of important key names in the map: </p>
-
-<table>
-<tr>
-<td>
-<p>PIG_SCRIPT_ID</p>
-<p>CLUSTER </p>
-<p>QUEUE_NAME</p>
-<p>JOBID</p>
-<p>JOBNAME</p>
-<p>STATUS</p>
-</td>
-<td>
-<p>USER </p>
-<p>HADOOP_VERSION  </p>
-<p>PIG_VERSION</p>
-<p>PIG_JOB_FEATURE</p>
-<p>PIG_JOB_ALIAS </p>
-<p>PIG_JOB_PARENTS</p>
-</td>
-<td>
-<p>SUBMIT_TIME</p>
-<p>LAUNCH_TIME</p>
-<p>FINISH_TIME</p>
-<p>TOTAL_MAPS</p>
-<p>TOTAL_REDUCES</p>
-</td>
-</tr>
-</table>
-<p></p>
-<p>Examples that use the loader to query Pig statistics are shown below.</p>
-</section>
-
-<!-- +++++++++++++++++++++++++++++++++++++++ -->
-<section>
-<title>Examples</title>
-<p>Find scripts that generate more then three MapReduce jobs:</p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = group a by (j#'PIG_SCRIPT_ID', j#'USER', j#'JOBNAME');
-c = foreach b generate group.$1, group.$2, COUNT(a);
-d = filter c by $2 > 3;
-dump d;
-</source>
-
-<p>Find the running time of each script (in seconds): </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, 
-         (Long) j#'SUBMIT_TIME' as start, (Long) j#'FINISH_TIME' as end;
-c = group b by (id, user, script_name)
-d = foreach c generate group.user, group.script_name, (MAX(b.end) - MIN(b.start)/1000;
-dump d;
-</source>
-
-<p>Find the number of scripts run by user and queue on a cluster: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'QUEUE_NAME' as queue;
-c = group b by (id, user, queue) parallel 10;
-d = foreach c generate group.user, group.queue, COUNT(b);
-dump d;
-</source>
-
-<p>Find scripts that have failed jobs: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate (Chararray) j#'STATUS' as status, j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, j#'JOBID' as job;
-c = filter b by status != 'SUCCESS';
-dump c;
-</source>
-
-<p>Find scripts that use only the default parallelism: </p>
-<source>
-a = load '/mapred/history/done' using HadoopJobHistoryLoader() as (j:map[], m:map[], r:map[]);
-b = foreach a generate j#'PIG_SCRIPT_ID' as id, j#'USER' as user, j#'JOBNAME' as script_name, (Long) r#'NUMBER_REDUCES' as reduces;
-c = group b by (id, user, script_name) parallel 10;
-d = foreach c generate group.user, group.script_name, MAX(b.reduces) as max_reduces;
-e = filter d by max_reduces == 1;
-dump e;
-</source>
-</section>
 </section>   
 
 <!-- =========================================================================== -->

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml Fri Feb 24 08:19:42 2017
@@ -976,6 +976,14 @@ public class CurrentTime extends EvalFun
 }
 </source>
 </section>
+<section id="udf-loadcaster">
+<title>Typecasting from bytearrays</title>
+<p>Just like <a href="#load-functions">Load Function</a> and <a href="basic.html#pig-streaming-input-output">Streaming</a>,
+Java UDF has a getLoadCaster() method that returns
+<a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadCaster.java?view=markup">LoadCaster</a>
+to convert byte arrays to specific types. A UDf implementation should implement this if casts (implicit or explicit) from DataByteArray fields to other types need to be supported. Default implementation returns null and Pig will determine if all parameters passed to the UDF have identical loadcaster and use it when true. </p>
+</section>
+
 <section id="tez-jvm-reuse">
         <title>Clean up static variable in Tez</title>
         <p>In Tez, jvm could reuse for other tasks. It is important to cleanup static variable to make sure there is no side effect. Here is one example:</p>
@@ -1038,6 +1046,9 @@ has methods to convert byte arrays to sp
 
 <li id="loadpredicatepushdown"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java?view=markup">LoadPredicatePushdown</a> 
  has the methods to push predicates to the loader. It is different than LoadMetadata.setPartitionFilter in that loader may load records which does not satisfy the predicates. In other words, predicates is only a hint. Note this interface is still in development and might change in next version. Currently only OrcStorage implements this interface.</li>
+
+<li id="nonfsloadfunc"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/NonFSLoadFunc.java?view=markup">NonFSLoadFunc</a> 
+ is a marker interface to indicate that a LoadFunc implementation is not a filesystem loader. This is useful for LoadFunc classes that for example supply queries instead of filesystem pathes to the LOAD operator.</li>
 </ul>
 
  <p>The LoadFunc abstract class is the main class to extend for implementing a loader. The methods which need to be overridden are explained below:</p>
@@ -1192,6 +1203,8 @@ abstract class has the main methods for
 This interface has methods to interact with metadata systems to store schema and store statistics. This interface is optional and should only be implemented if metadata needs to stored. </li>
 <li id="storeresources"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?view=markup">StoreResources:</a> 
 This interface has methods to put hdfs files or local files to distributed cache. </li>
+<li id="errorhandling"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandling.java?view=markup">ErrorHandling:</a> 
+This interface allow you to skip bad records in the storer so the storer will not throw exception and terminate the job. You can implement your own error handler by overriding <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?view=markup">ErrorHandler</a> interface, or use predefined error handler: <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?view=markup">CounterBasedErrorHandler</a>. ErrorHandling can be turned on by setting the property pig.error-handling.enabled to true in pig.properties. Default is false.  CounterBasedErrorHandler uses two settings - pig.error-handling.min.error.records (the minimum number of errors to trigger error handling) and pig.error-handling.error.threshold (percentage of the number of records as a fraction exceeding which error is thrown).</li>
 </ul>
 
 <p id="storefunc-override">The methods which need to be overridden in StoreFunc are explained below: </p>

Added: pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml (added)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/v_editors.xml Fri Feb 24 08:19:42 2017
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
+
+<document>
+    <header>
+        <title>Visual Editors</title>
+    </header>
+    <body>
+
+        <!-- ====================================================================== -->
+        <!-- v_editors-->
+        <section id="zeppelin">
+            <title>Running Pig in Apache Zeppelin</title>
+
+            <p><a href="https://zeppelin.apache.org/">Apache Zeppelin</a> is a web-based notebook that enables interactive data analytics. Pig is supported as a backend interpreter in Zeppelin starting from version 0.7. </p>
+            <p>User can do all the things in zeppelin as you do in grunt shell. Besides, you can take advantage of Zeppelin's visualization feature to visualize the Pig output. Here's 2 links for how to configure Pig in Zeppelin and how to run Pig script in Zeppelin.</p>
+
+            <ul>
+                <li>
+                    <a href="https://zeppelin.apache.org/docs/latest/interpreter/pig.html">https://zeppelin.apache.org/docs/latest/interpreter/pig.html</a>
+                </li>
+                <li>
+                    <a href="https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin">https://cwiki.apache.org/confluence/display/ZEPPELIN/Running+Pig+in+Apache+Zeppelin</a>
+                </li>
+            </ul>
+
+            <p><strong>Screenshot of running Pig in Zeppelin</strong> </p>
+            <p><img alt="Pig in zeppelin" src="images/pig_zeppelin.png" title="Pig in zeppelin"></img></p>
+
+        </section>
+    </body>
+</document>
\ No newline at end of file

Added: pig/branches/spark/src/docs/src/documentation/resources/images/pig_zeppelin.png
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/resources/images/pig_zeppelin.png?rev=1784237&view=auto
==============================================================================
Binary file - no diff available.

Propchange: pig/branches/spark/src/docs/src/documentation/resources/images/pig_zeppelin.png
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java (original)
+++ pig/branches/spark/src/org/apache/pig/CounterBasedErrorHandler.java Fri Feb 24 08:19:42 2017
@@ -34,10 +34,10 @@ public class CounterBasedErrorHandler im
 
     public CounterBasedErrorHandler() {
         Configuration conf = UDFContext.getUDFContext().getJobConf();
-        this.minErrors = conf.getLong(PigConfiguration.PIG_ERRORS_MIN_RECORDS,
+        this.minErrors = conf.getLong(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
                 0);
         this.errorThreshold = conf.getFloat(
-                PigConfiguration.PIG_ERROR_THRESHOLD_PERCENT, 0.0f);
+                PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT, 0.0f);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Fri Feb 24 08:19:42 2017
@@ -369,4 +369,17 @@ public abstract class EvalFunc<T>  {
 
     public void setEndOfAllInput(boolean endOfAllInput) {
     }
+
+    /**
+     * This will be called on both the front end and the back
+     * end during execution.
+     * @return the {@link LoadCaster} associated with this eval. Returning null
+     * indicates that casts from bytearray will pick the one associated with the
+     * parameters when they all come from the same loadcaster type.
+     * @throws IOException if there is an exception during LoadCaster
+     */
+    public LoadCaster getLoadCaster() throws IOException {
+        return null;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Fri Feb 24 08:19:42 2017
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -47,6 +48,7 @@ public class JVMReuseImpl {
         PigGenericMapReduce.staticDataCleanup();
         PigStatusReporter.staticDataCleanup();
         PigCombiner.Combine.staticDataCleanup();
+        DistinctCombiner.Combine.staticDataCleanup();
 
         String className = null;
         String msg = null;

Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Fri Feb 24 08:19:42 2017
@@ -108,7 +108,7 @@ public abstract class LoadFunc {
     public abstract InputFormat getInputFormat() throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the back 
+     * This will be called on both the front end and the back
      * end during execution.
      * @return the {@link LoadCaster} associated with this loader. Returning null 
      * indicates that casts from byte array are not supported for this loader. 

Modified: pig/branches/spark/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/Main.java (original)
+++ pig/branches/spark/src/org/apache/pig/Main.java Fri Feb 24 08:19:42 2017
@@ -27,7 +27,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.net.URL;
@@ -45,9 +44,8 @@ import java.util.jar.Attributes;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import jline.ConsoleReader;
-import jline.ConsoleReaderInputStream;
-import jline.History;
+import jline.console.ConsoleReader;
+import jline.console.history.FileHistory;
 
 import org.antlr.runtime.RecognitionException;
 import org.apache.commons.logging.Log;
@@ -59,6 +57,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.backend.BackendException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.classification.InterfaceAudience;
@@ -76,6 +75,7 @@ import org.apache.pig.parser.DryRunGrunt
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.scripting.ScriptEngine.SupportedScriptLang;
 import org.apache.pig.tools.cmdline.CmdLineParser;
+import org.apache.pig.tools.grunt.ConsoleReaderInputStream;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -100,13 +100,12 @@ import com.google.common.io.Closeables;
 public class Main {
 
     static {
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-
+        Utils.addShutdownHookWithPriority(new Runnable() {
             @Override
             public void run() {
                 FileLocalizer.deleteTempResourceFiles();
             }
-        });
+        }, PigImplConstants.SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY);
     }
 
     private final static Log log = LogFactory.getLog(Main.class);
@@ -477,7 +476,7 @@ public class Main {
                 }
 
 
-                logFileName = validateLogFile(logFileName, file);
+                logFileName = validateLogFile(logFileName, localFileRet.file);
                 pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName));
 
                 // Set job name based on name of the script
@@ -488,7 +487,7 @@ public class Main {
                     new File(substFile).deleteOnExit();
                 }
 
-                scriptState.setScript(new File(file));
+                scriptState.setScript(localFileRet.file);
 
                 grunt = new Grunt(pin, pigContext);
                 gruntCalled = true;
@@ -551,12 +550,13 @@ public class Main {
                 }
                 // Interactive
                 mode = ExecMode.SHELL;
-              //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
-                ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), new OutputStreamWriter(System.out));
-                reader.setDefaultPrompt("grunt> ");
+                //Reader is created by first loading "pig.load.default.statements" or .pigbootup file if available
+                ConsoleReader reader = new ConsoleReader(Utils.getCompositeStream(System.in, properties), System.out);
+                reader.setExpandEvents(false);
+                reader.setPrompt("grunt> ");
                 final String HISTORYFILE = ".pig_history";
                 String historyFile = System.getProperty("user.home") + File.separator  + HISTORYFILE;
-                reader.setHistory(new History(new File(historyFile)));
+                reader.setHistory(new FileHistory(new File(historyFile)));
                 ConsoleReaderInputStream inputStream = new ConsoleReaderInputStream(reader);
                 grunt = new Grunt(new BufferedReader(new InputStreamReader(inputStream)), pigContext);
                 grunt.setConsoleReader(reader);
@@ -605,7 +605,7 @@ public class Main {
                     return ReturnCode.SUCCESS;
                 }
 
-                logFileName = validateLogFile(logFileName, remainders[0]);
+                logFileName = validateLogFile(logFileName, localFileRet.file);
                 pigContext.getProperties().setProperty("pig.logfile", (logFileName == null? "": logFileName));
 
                 if (!debug) {
@@ -660,6 +660,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } catch (Throwable e) {
             rc = ReturnCode.THROWABLE_EXCEPTION;
             PigStatsUtil.setErrorMessage(e.getMessage());
@@ -668,6 +669,7 @@ public class Main {
             if(!gruntCalled) {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
+            killRunningJobsIfInterrupted(e, pigContext);
         } finally {
             if (printScriptRunTime) {
                 printScriptRunTime(startTime);
@@ -694,6 +696,22 @@ public class Main {
                 + " (" + duration.getMillis() + " ms)");
     }
 
+    private static void killRunningJobsIfInterrupted(Throwable e, PigContext pigContext) {
+        Throwable cause = e.getCause();
+        // Kill running job when we get InterruptedException
+        // Pig thread is interrupted by mapreduce when Oozie launcher job is killed
+        // Shutdown hook kills running jobs, but sometimes NodeManager can issue
+        // a SIGKILL after AM unregisters and before shutdown hook gets to execute
+        // causing orphaned jobs that continue to run.
+        if (e instanceof InterruptedException || (cause != null && cause instanceof InterruptedException)) {
+            try {
+                pigContext.getExecutionEngine().kill();
+            } catch (BackendException be) {
+                log.error("Error while killing running jobs", be);
+            }
+        }
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties properties) {
 
         try {
@@ -971,11 +989,10 @@ public class Main {
             System.out.println("Additionally, any Hadoop property can be specified.");
     }
 
-    private static String validateLogFile(String logFileName, String scriptName) {
+    private static String validateLogFile(String logFileName, File scriptFile) {
         String strippedDownScriptName = null;
 
-        if(scriptName != null) {
-            File scriptFile = new File(scriptName);
+        if (scriptFile != null) {
             if(!scriptFile.isDirectory()) {
                 String scriptFileAbsPath;
                 try {

Added: pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java (added)
+++ pig/branches/spark/src/org/apache/pig/NonFSLoadFunc.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+/**
+ * Marker interface to distinguish LoadFunc implementations that don't use file system sources.
+ */
+public interface NonFSLoadFunc {
+
+}

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Feb 24 08:19:42 2017
@@ -18,6 +18,7 @@
 
 package org.apache.pig;
 
+
 /**
  * Container for static configuration strings, defaults, etc. This is intended just for keys that can
  * be set by users, not for keys that are generally used within pig.
@@ -62,9 +63,15 @@ public class PigConfiguration {
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
     /**
      * These keys are used to enable or disable tez union optimization for
-     * specific StoreFuncs so that optimization is only applied to StoreFuncs
-     * that do not hard part file names and honor mapreduce.output.basename and
-     * is turned of for those that do not. Refer PIG-4649
+     * specific StoreFuncs. Optimization should be turned off for those
+     * StoreFuncs that hard code part file names and do not prefix file names
+     * with mapreduce.output.basename configuration.
+     *
+     * If the StoreFuncs implement
+     * {@link StoreFunc#supportsParallelWriteToStoreLocation()} and return true
+     * or false then that is is used to turn on or off union optimization
+     * respectively. These settings can be used for StoreFuncs that have not
+     * implemented the API yet.
      */
     public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = "pig.tez.opt.union.supported.storefuncs";
     public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs";
@@ -127,6 +134,58 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
 
     /**
+     * Bloom join has two different kind of implementations.
+     * <ul>
+     * <li>map <br>
+     * In each map, bloom filters are computed on the join keys partitioned by
+     * the hashcode of the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of
+     * partitions. Bloom filters from different maps are then combined in the
+     * reducer producing one bloom filter per partition. This is efficient and
+     * fast if there are smaller number of maps (<10) and the number of
+     * distinct keys are not too high. It can be faster with larger number of
+     * maps and even with bigger bloom vector sizes, but the amount of data
+     * shuffled to the reducer for aggregation becomes huge making it
+     * inefficient.</li>
+     * <li>reduce <br>
+     * Join keys are sent from the map to the reducer partitioned by hashcode of
+     * the key with {@link #PIG_BLOOMJOIN_NUM_FILTERS} number of reducers. One
+     * bloom filter is then created per partition. This is efficient for larger
+     * datasets with lot of maps or very large
+     * {@link #PIG_BLOOMJOIN_VECTORSIZE_BYTES}. In this case size of keys sent
+     * to the reducer is smaller than sending bloom filters to reducer for
+     * aggregation making it efficient.</li>
+     * </ul>
+     * Default value is map.
+     */
+    public static final String PIG_BLOOMJOIN_STRATEGY = "pig.bloomjoin.strategy";
+
+    /**
+     * The number of bloom filters that will be created.
+     * Default is 1 for map strategy and 11 for reduce strategy.
+     */
+    public static final String PIG_BLOOMJOIN_NUM_FILTERS = "pig.bloomjoin.num.filters";
+
+    /**
+     * The size in bytes of the bit vector to be used for the bloom filter.
+     * A bigger vector size will be needed when the number of distinct keys is higher.
+     * Default value is 1048576 (1MB).
+     */
+    public static final String PIG_BLOOMJOIN_VECTORSIZE_BYTES = "pig.bloomjoin.vectorsize.bytes";
+
+    /**
+     * The type of hash function to use. Valid values are jenkins and murmur.
+     * Default is murmur.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_TYPE = "pig.bloomjoin.hash.type";
+
+    /**
+     * The number of hash functions to be used in bloom computation. It determines the probability of false positives.
+     * Higher the number lower the false positives. Too high a value can increase the cpu time.
+     * Default value is 3.
+     */
+    public static final String PIG_BLOOMJOIN_HASH_FUNCTIONS = "pig.bloomjoin.hash.functions";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
@@ -151,6 +210,12 @@ public class PigConfiguration {
      * This key is used to configure grace parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism";
+    /**
+     * This key is used to turn off dag recovery if there is auto parallelism.
+     * Default is false. Useful when running with Tez versions before Tez 0.8
+     * which have issues with auto parallelism during DAG recovery.
+     */
+    public static final String PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY = "pig.tez.auto.parallelism.disable.dag.recovery";
 
     /**
      * This key is used to configure compression for the pig input splits which
@@ -324,17 +389,17 @@ public class PigConfiguration {
     /**
      * Boolean value used to enable or disable error handling for storers
      */
-    public static final String PIG_ALLOW_STORE_ERRORS = "pig.allow.store.errors";
+    public static final String PIG_ERROR_HANDLING_ENABLED = "pig.error-handling.enabled";
 
     /**
      * Controls the minimum number of errors
      */
-    public static final String PIG_ERRORS_MIN_RECORDS = "pig.errors.min.records";
+    public static final String PIG_ERROR_HANDLING_MIN_ERROR_RECORDS = "pig.error-handling.min.error.records";
 
     /**
      * Set the threshold for percentage of errors
      */
-    public static final String PIG_ERROR_THRESHOLD_PERCENT = "pig.error.threshold.percent";
+    public static final String PIG_ERROR_HANDLING_THRESHOLD_PERCENT = "pig.error-handling.error.threshold";
 
     /**
      * Comma-delimited entries of commands/operators that must be disallowed.
@@ -411,6 +476,31 @@ public class PigConfiguration {
      */
     public static final String PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE = "pig.spill.unused.memory.threshold.size";
 
+    /**
+     * Log tracing id that can be used by upstream clients for tracking respective logs
+     */
+    public static final String PIG_LOG_TRACE_ID = "pig.log.trace.id";
+
+    /**
+     * @deprecated use {@link #PIG_LOG_TRACE_ID} instead. Will be removed in Pig 0.18
+     */
+    public static final String CALLER_ID = PIG_LOG_TRACE_ID;
+
+    /**
+     * Enable ATS for Pig
+     */
+    public static final String PIG_ATS_ENABLED = "pig.ats.enabled";
+
+    /**
+     * @deprecated use {@link #PIG_ATS_ENABLED} instead. Will be removed in Pig 0.18
+     */
+    public static final String ENABLE_ATS = PIG_ATS_ENABLED;
+
+    /**
+     * If set, Pig will override tez.am.launch.cmd-opts and tez.am.resource.memory.mb to optimal
+     */
+    public static final String PIG_TEZ_CONFIGURE_AM_MEMORY = "pig.tez.configure.am.memory";
+
     // Deprecated settings of Pig 0.13
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigServer.java Fri Feb 24 08:19:42 2017
@@ -25,6 +25,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.io.StringReader;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -53,6 +55,7 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.PigATSClient;
 import org.apache.pig.backend.hadoop.executionengine.HJob;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.classification.InterfaceAudience;
@@ -241,6 +244,54 @@ public class PigServer {
         }
         PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
 
+        // log ATS event includes the caller context
+        String auditId = PigATSClient.getPigAuditId(pigContext);
+        String callerId = (String)pigContext.getProperties().get(PigConfiguration.PIG_LOG_TRACE_ID);
+        log.info("Pig Script ID for the session: " + auditId);
+        if (callerId != null) {
+            log.info("Caller ID for session: " + callerId);
+        }
+        if (Boolean.parseBoolean(pigContext.getProperties()
+                .getProperty(PigConfiguration.PIG_ATS_ENABLED))) {
+            if (Boolean.parseBoolean(pigContext.getProperties()
+                    .getProperty("yarn.timeline-service.enabled", "false"))) {
+                PigATSClient.ATSEvent event = new PigATSClient.ATSEvent(auditId, callerId);
+                try {
+                    PigATSClient.getInstance().logEvent(event);
+                } catch (Exception e) {
+                    log.warn("Error posting to ATS: ", e);
+                }
+            } else {
+                log.warn("ATS is disabled since"
+                        + " yarn.timeline-service.enabled set to false");
+            }
+
+        }
+
+        // set hdfs caller context
+        Class callerContextClass = null;
+        try {
+            callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext");
+        } catch (ClassNotFoundException e) {
+            // If pre-Hadoop 2.8.0, skip setting CallerContext
+        }
+        if (callerContextClass != null) {
+            try {
+                // Reflection for the following code since it is only available since hadoop 2.8.0:
+                // CallerContext hdfsContext = new CallerContext.Builder(auditId).build();
+                // CallerContext.setCurrent(hdfsContext);
+                Class callerContextBuilderClass = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder");
+                Constructor callerContextBuilderConstruct = callerContextBuilderClass.getConstructor(String.class);
+                Object builder = callerContextBuilderConstruct.newInstance(auditId);
+                Method builderBuildMethod = builder.getClass().getMethod("build");
+                Object hdfsContext = builderBuildMethod.invoke(builder);
+                Method callerContextSetCurrentMethod = callerContextClass.getMethod("setCurrent", hdfsContext.getClass());
+                callerContextSetCurrentMethod.invoke(callerContextClass, hdfsContext);
+            } catch (Exception e) {
+                // Shall not happen unless API change in future Hadoop commons
+                throw new ExecException(e);
+            }
+        }
     }
 
     private void addHadoopProperties() throws ExecException {
@@ -612,7 +663,8 @@ public class PigServer {
             pigContext.scriptingUDFs.put(path, namespace);
         }
 
-        File f = FileLocalizer.fetchFile(pigContext.getProperties(), path).file;
+        FetchFileRet ret = FileLocalizer.fetchFile(pigContext.getProperties(), path);
+        File f = ret.file;
         if (!f.canRead()) {
             int errCode = 4002;
             String msg = "Can't read file: " + path;
@@ -621,9 +673,19 @@ public class PigServer {
         }
         String cwd = new File(".").getCanonicalPath();
         String filePath = f.getCanonicalPath();
-        //Use the relative path in the jar, if the path specified is relative
-        String nameInJar = filePath.equals(cwd + File.separator + path) ?
-                filePath.substring(cwd.length() + 1) : filePath;
+        String nameInJar = filePath;
+        // Use the relative path in the jar, if the path specified is relative
+        if (!ret.didFetch) {
+            if (!new File(path).isAbsolute() && path.indexOf("." + File.separator) == -1) {
+                // In case of Oozie, the localized files are in a different
+                // directory symlinked to the current directory. Canonical path will not point to cwd.
+                nameInJar = path;
+            } else if (filePath.equals(cwd + File.separator + path)) {
+                // If user specified absolute path and it refers to cwd
+                nameInJar = filePath.substring(cwd.length() + 1);
+            }
+        }
+
         pigContext.addScriptFile(nameInJar, filePath);
         if(scriptingLang != null) {
             ScriptEngine se = ScriptEngine.getInstance(scriptingLang);

Modified: pig/branches/spark/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StoreFunc.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/StoreFunc.java Fri Feb 24 08:19:42 2017
@@ -21,17 +21,14 @@ import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 
 /**
@@ -45,21 +42,21 @@ public abstract class StoreFunc implemen
     /**
      * This method is called by the Pig runtime in the front end to convert the
      * output location to an absolute path if the location is relative. The
-     * StoreFunc implementation is free to choose how it converts a relative 
+     * StoreFunc implementation is free to choose how it converts a relative
      * location to an absolute location since this may depend on what the location
-     * string represent (hdfs path or some other data source). 
-     *  
-     * 
+     * string represent (hdfs path or some other data source).
+     *
+     *
      * @param location location as provided in the "store" statement of the script
      * @param curDir the current working direction based on any "cd" statements
      * in the script before the "store" statement. If there are no "cd" statements
-     * in the script, this would be the home directory - 
+     * in the script, this would be the home directory -
      * <pre>/user/<username> </pre>
      * @return the absolute location based on the arguments passed
      * @throws IOException if the conversion is not possible
      */
     @Override
-    public String relToAbsPathForStoreLocation(String location, Path curDir) 
+    public String relToAbsPathForStoreLocation(String location, Path curDir)
     throws IOException {
         return LoadFunc.getAbsolutePath(location, curDir);
     }
@@ -67,32 +64,34 @@ public abstract class StoreFunc implemen
     /**
      * Return the OutputFormat associated with StoreFunc.  This will be called
      * on the front end during planning and on the backend during
-     * execution. 
+     * execution.
      * @return the {@link OutputFormat} associated with StoreFunc
-     * @throws IOException if an exception occurs while constructing the 
+     * @throws IOException if an exception occurs while constructing the
      * OutputFormat
      *
      */
+    @Override
     public abstract OutputFormat getOutputFormat() throws IOException;
 
     /**
-     * Communicate to the storer the location where the data needs to be stored.  
-     * The location string passed to the {@link StoreFunc} here is the 
-     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)} 
+     * Communicate to the storer the location where the data needs to be stored.
+     * The location string passed to the {@link StoreFunc} here is the
+     * return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
      * This method will be called in the frontend and backend multiple times. Implementations
      * should bear in mind that this method is called multiple times and should
      * ensure there are no inconsistent side effects due to the multiple calls.
      * {@link #checkSchema(ResourceSchema)} will be called before any call to
      * {@link #setStoreLocation(String, Job)}.
-     * 
+     *
 
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
      * @param job The {@link Job} object
      * @throws IOException if the location is not valid.
      */
+    @Override
     public abstract void setStoreLocation(String location, Job job) throws IOException;
- 
+
     /**
      * Set the schema for data to be stored.  This will be called on the
      * front end during planning if the store is associated with a schema.
@@ -117,21 +116,23 @@ public abstract class StoreFunc implemen
      * @param writer RecordWriter to use.
      * @throws IOException if an exception occurs during initialization
      */
+    @Override
     public abstract void prepareToWrite(RecordWriter writer) throws IOException;
 
     /**
      * Write a tuple to the data store.
-     * 
+     *
      * @param t the tuple to store.
      * @throws IOException if an exception occurs during the write
      */
+    @Override
     public abstract void putNext(Tuple t) throws IOException;
-    
+
     /**
      * This method will be called by Pig both in the front end and back end to
      * pass a unique signature to the {@link StoreFunc} which it can use to store
      * information in the {@link UDFContext} which it needs to store between
-     * various method invocations in the front end and back end. This method 
+     * various method invocations in the front end and back end. This method
      * will be called before other methods in {@link StoreFunc}.  This is necessary
      * because in a Pig Latin script with multiple stores, the different
      * instances of store functions need to be able to find their (and only their)
@@ -142,21 +143,21 @@ public abstract class StoreFunc implemen
     public void setStoreFuncUDFContextSignature(String signature) {
         // default implementation is a no-op
     }
-    
+
     /**
      * This method will be called by Pig if the job which contains this store
      * fails. Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output location.
      * The default implementation  deletes the output location if it
      * is a {@link FileSystem} location.
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain 
+     * @param job The {@link Job} object - this should be used only to obtain
      * cluster properties through {@link Job#getConfiguration()} and not to set/query
-     * any runtime job information. 
+     * any runtime job information.
      */
     @Override
-    public void cleanupOnFailure(String location, Job job) 
+    public void cleanupOnFailure(String location, Job job)
     throws IOException {
         cleanupOnFailureImpl(location, job);
     }
@@ -166,19 +167,19 @@ public abstract class StoreFunc implemen
      * is successful, and some cleanup of intermediate resources is required.
      * Implementations can clean up output locations in this method to
      * ensure that no incorrect/incomplete results are left in the output location.
-     * @param location Location returned by 
+     * @param location Location returned by
      * {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
-     * @param job The {@link Job} object - this should be used only to obtain 
+     * @param job The {@link Job} object - this should be used only to obtain
      * cluster properties through {@link Job#getConfiguration()} and not to set/query
-     * any runtime job information. 
+     * any runtime job information.
      */
     @Override
-    public void cleanupOnSuccess(String location, Job job) 
+    public void cleanupOnSuccess(String location, Job job)
     throws IOException {
         // DEFAULT: DO NOTHING, user-defined overrides can
         // call cleanupOnFailureImpl(location, job) or ...?
     }
-    
+
     /**
      * Default implementation for {@link #cleanupOnFailure(String, Job)}
      * and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
@@ -187,15 +188,56 @@ public abstract class StoreFunc implemen
      * @param job Hadoop job, used to access the appropriate file system.
      * @throws IOException
      */
-    public static void cleanupOnFailureImpl(String location, Job job) 
-    throws IOException {        
+    public static void cleanupOnFailureImpl(String location, Job job)
+    throws IOException {
         Path path = new Path(location);
         FileSystem fs = path.getFileSystem(job.getConfiguration());
         if(fs.exists(path)){
             fs.delete(path, true);
-        }    
+        }
+    }
+
+    // TODO When dropping support for JDK 7 move this as a default method to StoreFuncInterface
+    /**
+     * DAG execution engines like Tez support optimizing union by writing to
+     * output location in parallel from tasks of different vertices. Commit is
+     * called once all the vertices in the union are complete. This eliminates
+     * need to have a separate phase to read data output from previous phases,
+     * union them and write out again.
+     *
+     * Enabling the union optimization requires the OutputFormat to
+     *
+     *      1) Support creation of different part file names for tasks of different
+     * vertices. Conflicting filenames can create data corruption and loss.
+     * For eg: If task 0 of vertex1 and vertex2 both create filename as
+     * part-r-00000, then one of the files will be overwritten when promoting
+     * from temporary to final location leading to data loss.
+     *      FileOutputFormat has mapreduce.output.basename config which enables
+     *  naming files differently in different vertices. Classes extending
+     *  FileOutputFormat and those prefixing file names with mapreduce.output.basename
+     *  value will not encounter conflict. Cases like HBaseStorage which write to key
+     *  value store and do not produce files also should not face any conflict.
+     *
+     *      2) Support calling of commit once at the end takes care of promoting
+     * temporary files of the different vertices into the final location.
+     * For eg: FileOutputFormat commit algorithm handles promoting of files produced
+     * by tasks of different vertices into final output location without issues
+     * if there is no file name conflict. In cases like HBaseStorage, the
+     * TableOutputCommitter does nothing on commit.
+     *
+     * If custom OutputFormat used by the StoreFunc does not support the above
+     * two criteria, then false should be returned. Union optimization will be
+     * disabled for the StoreFunc.
+     *
+     * Default implementation returns null and in that case planner falls back
+     * to {@link PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS} and
+     * {@link PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS}
+     * settings to determine if the StoreFunc supports it.
+     */
+    public Boolean supportsParallelWriteToStoreLocation() {
+        return null;
     }
-    
+
     /**
      * Issue a warning.  Warning messages are aggregated and reported to
      * the user.

Modified: pig/branches/spark/src/org/apache/pig/StreamToPig.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/StreamToPig.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/StreamToPig.java (original)
+++ pig/branches/spark/src/org/apache/pig/StreamToPig.java Fri Feb 24 08:19:42 2017
@@ -57,7 +57,7 @@ public interface StreamToPig {
     public Tuple deserialize(byte[] bytes) throws IOException;
 
     /**
-     * This will be called on the front end during planning and not on the back
+     * This will be called on both the front end and the back
      * end during execution.
      *
      * @return the {@link LoadCaster} associated with this object, or null if

Modified: pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Fri Feb 24 08:19:42 2017
@@ -183,6 +183,14 @@ public interface ExecutionEngine {
     public ExecutableManager getExecutableManager();
 
     /**
+     * This method is called when user requests to kill all jobs
+     * associated with the execution engine
+     *
+     * @throws BackendException
+     */
+    public void kill() throws BackendException;
+
+    /**
      * This method is called when a user requests to kill a job associated with
      * the given job id. If it is not possible for a user to kill a job, throw a
      * exception. It is imperative for the job id's being displayed to be unique