You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [3/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml Thu Nov 27 12:49:54 2014
@@ -22,6 +22,56 @@
   </header>
   <body> 
   
+<section id="tez-mode">
+  <title>Tez mode</title>
+  <p><a href="http://tez.apache.org">Apache Tez</a> provides an alternative execution engine than MapReduce focusing on performance. By using optimized job flow, edge semantics and container reuse, we see consistent performance boost for both large job and small job. </p>
+  <section id="enable-tez">
+    <title>How to enable Tez</title>
+    <p>To run Pig in tez mode, simply add "-x tez" in pig command line. Alternatively, you can add "exectype=tez" to conf/pig.properties to change the default exec type to Tez. Java system property "-Dexectype=tez" is also good to trigger the Tez mode.</p>
+    <p>Prerequisite: Tez requires the tez tarball to be available in hdfs while running a job on the cluster and a tez-site.xml with tez.lib.uris setting pointing to that hdfs location in classpath. Copy the tez tarball to hdfs and add the tez conf directory($TEZ_HOME/conf) containing tez-site.xml to environmental variable "PIG_CLASSPATH" if pig on tez fails with "tez.lib.uris is not defined". This is required by the Apache Pig distribution.</p>
+<source>
+  &lt;property&gt;
+    &lt;name&gt;tez.lib.uris&lt;/name&gt;
+    &lt;value&gt;${fs.default.name}/apps/tez/tez-0.5.2.tar.gz&lt;/value&gt;
+  &lt;/property&gt;
+</source>
+  </section>
+  <section id="tez-dag">
+    <title>Tez DAG generation</title>
+    <p>Every Pig script will be compiled into 1 or more Tez DAG (typically 1). Every Tez DAG consists of a number of vertices and and edges connecting vertices. For example, a simple join involves 1 DAG which consists of 3 vertices: load left input, load right input and join. Do an <a href="test.html#explain">explain</a> in Tez mode will show you the DAG Pig script compiled into.</p>
+  </section>
+  <section id="container-reuse">
+    <title>Tez session/container reuse</title>
+    <p>One downside of MapReduce is the startup cost for a job is very high. That hurts the performance especially for small job. Tez alleviate the problem by using session and container reuse, so it is not necessary to start an application master for every job, and start a JVM for every task. By default, session/container reuse is on and we usually shall not turn it off. JVM reuse might cause some side effect if static variable is used since static variable might live across different jobs. So if static variable is used in EvalFunc/LoadFunc/StoreFunc, be sure to implement a cleanup function and register with <a href="http://pig.apache.org/docs/r0.14.0/api/org/apache/pig/JVMReuseManager.html">JVMReuseManager</a>.</p>
+  </section>
+  <section id="auto-parallelism">
+    <title>Automatic parallelism</title>
+    <p>Just like MapReduce, if user specify "parallel" in their Pig statement, or user define default_parallel in Tez mode, Pig will honor it (the only exception is if user specify a parallel which is apparently too low, Pig will override it) </p>
+    <p>If user specify neither "parallel" or "default_parallel", Pig will use automatic parallelism. In MapReduce, Pig submit one MapReduce job a time and before submiting a job, Pig has chance to automatically set reduce parallelism based on the size of input file. On the contrary, Tez submit a DAG as a unit and automatic parallelism is managed in two parts</p>
+    <ul>
+    <li>Before submiting a DAG, Pig estimate parallelism of each vertex statically based on the input file size of the DAG and the complexity of the pipeline of each vertex</li>
+    <li>At runtime, Tez adjust vertex parallelism dynamically based on the input data volume of the vertex. Note currently Tez can only decrease the parallelism dynamically not increase. So in step 1, Pig overestimate the parallelism</li>
+    </ul>
+    <p>The following parameter control the behavior of automatic parallelism in Tez (share with MapReduce):</p>
+<source>
+pig.exec.reducers.bytes.per.reducer
+pig.exec.reducers.max
+</source>
+  </section>
+  <section id="api-change">
+    <title>API change</title>
+    <p>If invoking Pig in Java, there is change in PigStats and PigProgressNotificationListener if using PigRunner.run(), check <a href="test.html#pig-statistics">Pig Statistics</a> and <a href="test.html#ppnl">Pig Progress Notification Listener</a></p>
+  </section>
+  <section id="known-issue">
+    <title>Known issues</title>
+    <p>Currently known issue in Tez mode includes:</p>
+    <ul>
+    <li>Tez local mode is not stable, we see job hang in some cases</li>
+    <li>Tez specific GUI is not available yet, there is no GUI to track task progress. However, log message is available in GUI</li>
+    </ul>
+  </section>
+</section>
+
 <section id="profiling">
   <title>Timing your UDFs</title>
   <p>The first step to improving performance and efficiency is measuring where the time is going. Pig provides a light-weight method for approximately measuring how much time is spent in different user-defined functions (UDFs) and Loaders. Simply set the pig.udf.profile property to true. This will cause new counters to be tracked for all Map-Reduce jobs generated by your script: approx_microsecs measures the approximate amount of time spent in a UDF, and approx_invocations measures the approximate number of times the UDF was invoked. In addition, the frequency of profiling can be configured via the pig.udf.profile.frequency (by default, every 100th invocation). Note that this may produce a large number of counters (two per UDF). Excessive amounts of counters can lead to poor JobTracker performance, so use this feature carefully, and preferably on a test cluster.</p>
@@ -188,7 +238,7 @@ reducers. The maximum number of reducers
 <p>
 The default reducer estimation algorithm described above can be overridden by setting the
 pig.exec.reducer.estimator parameter to the fully qualified class name of an implementation of
-<a href="http://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java">org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator</a>.
+<a href="http://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigReducerEstimator.java">org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator</a>(MapReduce) or <a href="http://svn.apache.org/repos/asf/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java">org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator</a>(Tez).
 The class must exist on the classpath of the process submitting the Pig job. If the
 pig.exec.reducer.estimator.arg parameter is set, the value will be passed to a constructor
 of the implementing class that takes a single String.
@@ -513,9 +563,24 @@ A = LOAD 'input' as (dt, state, event) u
 </section>
 
 <!-- +++++++++++++++++++++++++++++++ -->
-<section id="FilterLogicExpressionSimplifier">
-<title>FilterLogicExpressionSimplifier</title>
-<p>This rule simplifies the expression in filter statement.</p>
+<section id="PredicatePushdownOptimizer">
+<title>PredicatePushdownOptimizer</title>
+<p>Push the filter condition to loader. Different than PartitionFilterOptimizer, the filter condition will be evaluated in Pig. In other words, the filter condition pushed to the loader is a hint. Loader might still load records which does not satisfy filter condition.</p>
+<source>
+A = LOAD 'input' using OrcStorage();
+B = FILTER A BY dt=='201310' AND state=='CA';
+</source>
+<p>Filter condition will be pushed to loader if loader supports</p>
+<source>
+A = LOAD 'input' using OrcStorage();  -- Filter condition push to loader
+B = FILTER A BY dt=='201310' AND state=='CA';  -- Filter evaluated in Pig again
+</source>
+</section>
+
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="ConstantCalculator">
+<title>ConstantCalculator</title>
+<p>This rule evaluates constant expression.</p>
 <source>
 1) Constant pre-calculation 
 
@@ -523,39 +588,12 @@ B = FILTER A BY a0 &gt; 5+7; 
 is simplified to 
 B = FILTER A BY a0 &gt; 12; 
 
-2) Elimination of negations 
-
-B = FILTER A BY NOT (NOT(a0 &gt; 5) OR a &gt; 10); 
-is simplified to 
-B = FILTER A BY a0 &gt; 5 AND a &lt;= 10; 
-
-3) Elimination of logical implied expression in AND 
-
-B = FILTER A BY (a0 &gt; 5 AND a0 &gt; 7); 
-is simplified to 
-B = FILTER A BY a0 &gt; 7; 
-
-4) Elimination of logical implied expression in OR 
-
-B = FILTER A BY ((a0 &gt; 5) OR (a0 &gt; 6 AND a1 &gt; 15); 
-is simplified to 
-B = FILTER C BY a0 &gt; 5; 
-
-5) Equivalence elimination 
+2) Evaluate UDF
 
-B = FILTER A BY (a0 v 5 AND a0 &gt; 5); 
+B = FOREACH A generate UPPER(CONCAT('a', 'b'));
 is simplified to 
-B = FILTER A BY a0 &gt; 5; 
-
-6) Elimination of complementary expressions in OR 
-
-B = FILTER A BY (a0 &gt; 5 OR a0 &lt;= 5); 
-is simplified to non-filtering 
-
-7) Elimination of naive TRUE expression 
+B = FOREACH A generate 'AB';
 
-B = FILTER A BY 1==1; 
-is simplified to non-filtering 
 </source>
 </section>
 

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml Thu Nov 27 12:49:54 2014
@@ -964,6 +964,8 @@
 
 <p><a href="func.html#strsplit">STRSPLIT</a> function</p>
 
+<p><a href="func.html#strsplittobag">STRSPLITTOBAG</a> function</p>
+
 <p><a href="func.html#substring">SUBSTRING</a> function</p>
 
 <p><a href="func.html#sum">SUM</a> function</p>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Thu Nov 27 12:49:54 2014
@@ -34,21 +34,14 @@
  <p><strong>Mandatory</strong></p>
       <p>Unix and Windows users need the following:</p>
 		<ul>
-		  <li> <strong>Hadoop 0.20.2, 020.203, 020.204,  0.20.205, 1.0.0, 1.0.1, or 0.23.0, 0.23.1</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 1.0.0.)</li>
-		  <li> <strong>Java 1.6</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li>	
+		  <li> <strong>Hadoop 0.23.X, 1.X or 2.X</strong> - <a href="http://hadoop.apache.org/common/releases.html">http://hadoop.apache.org/common/releases.html</a> (You can run Pig with different versions of Hadoop by setting HADOOP_HOME to point to the directory where you have installed Hadoop. If you do not set HADOOP_HOME, by default Pig will run with the embedded version, currently Hadoop 1.0.4.)</li>
+		  <li> <strong>Java 1.7</strong> - <a href="http://java.sun.com/javase/downloads/index.jsp">http://java.sun.com/javase/downloads/index.jsp</a> (set JAVA_HOME to the root of your Java installation)</li>	
 		</ul>
 		<p></p>
-	<p>Windows users also need to install Cygwin and the Perl package: <a href="http://www.cygwin.com/"> http://www.cygwin.com/</a></p>
-
-<p></p>
  <p><strong>Optional</strong></p>
  		<ul>
-          <li> <strong>Python 2.5</strong> - <a href="http://jython.org/downloads.html">http://jython.org/downloads.html</a> (when using Python UDFs or embedding Pig in Python) </li>
-          <li> <strong>JavaScript 1.7</strong> - <a href="https://developer.mozilla.org/en/Rhino_downloads_archive">https://developer.mozilla.org/en/Rhino_downloads_archive</a> and <a href="http://mirrors.ibiblio.org/pub/mirrors/maven2/rhino/js/">http://mirrors.ibiblio.org/pub/mirrors/maven2/rhino/js/</a>  (when using JavaScript UDFs or embedding Pig in JavaScript) </li>		  
-          <li> <strong>JRuby 1.6.7</strong> - <a href="http://www.jruby.org/download">http://www.jruby.org/download</a> (when using JRuby UDFs) </li>
-          <li> <strong>Groovy (<em>groovy-all</em>) 1.8.6</strong> - <a href="http://groovy.codehaus.org/Download">http://groovy.codehaus.org/Download</a> or directly on a maven repo <a href="http://mirrors.ibiblio.org/pub/mirrors/maven2/org/codehaus/groovy/groovy-all/1.8.6/">http://mirrors.ibiblio.org/pub/mirrors/maven2/org/codehaus/groovy/groovy-all/1.8.6/</a> (when using Groovy UDFs or embedding Pig in Groovy) </li>
-		  <li> <strong>Ant 1.7</strong> - <a href="http://ant.apache.org/">http://ant.apache.org/</a> (for builds) </li>
-		  <li> <strong>JUnit 4.5</strong> - <a href="http://junit.sourceforge.net/">http://junit.sourceforge.net/</a> (for unit tests) </li>
+          <li> <strong>Python 2.7</strong> - <a href="http://jython.org/downloads.html">https://www.python.org</a> (when using Streaming Python UDFs) </li>
+          <li> <strong>Ant 1.8</strong> - <a href="http://ant.apache.org/">http://ant.apache.org/</a> (for builds) </li>
 		</ul>
  
   </section>         
@@ -89,6 +82,7 @@ Test the Pig installation with this simp
 	  <li> Build the code from the top directory: <code>ant</code> <br></br>
 	  If the build is successful, you should see the pig.jar file created in that directory. </li>	
 	  <li> Validate the pig.jar  by running a unit test: <code>ant test</code></li>
+	  <li> If you are using Hadoop 0.23.X or 2.X, please add -Dhadoopversion=23 in your ant command line in the previous steps</li>
      </ol>
  </section>
 </section>
@@ -103,16 +97,22 @@ Test the Pig installation with this simp
 	<tr>
 	<td></td>
     <td><strong>Local Mode</strong></td>
+    <td><strong>Tez Local Mode</strong></td>
     <td><strong>Mapreduce Mode</strong></td>
+    <td><strong>Tez Mode</strong></td>
 	</tr>
 	<tr>
 	<td><strong>Interactive Mode </strong></td>
     <td>yes</td>
+    <td>experimental</td>
+    <td>yes</td>
     <td>yes</td>
 	</tr>
 	<tr>
 	<td><strong>Batch Mode</strong> </td>
     <td>yes</td>
+    <td>experimental</td>
+    <td>yes</td>
     <td>yes</td>
 	</tr>
 	</table>
@@ -122,10 +122,15 @@ Test the Pig installation with this simp
 	<title>Execution Modes</title> 
 <p>Pig has two execution modes or exectypes: </p>
 <ul>
-<li><strong>Local Mode</strong> - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local). Note that local mode does not support parallel mapper execution with Hadoop 0.20.x and 1.0.0. This is because the LocalJobRunner of these Hadoop versions is not thread-safe.
+<li><strong>Local Mode</strong> - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local).
+</li>
+<li><strong>Tez Local Mode</strong> - To run Pig in tez local mode. It is similar to local mode, except internally Pig will invoke tez runtime engine. Specify Tez local mode using the -x flag (pig -x tez_local).
+<p><strong>Note:</strong> Tez local mode is experimental. There are some queries which just error out on bigger data in local mode.</p>
 </li>
 <li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce).
 </li>
+<li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x tez).
+</li>
 </ul>
 <p></p>
 
@@ -141,23 +146,16 @@ Test the Pig installation with this simp
 /* local mode */
 $ pig -x local ...
  
+/* Tez local mode */
+$ pig -x tez_local ...
  
 /* mapreduce mode */
 $ pig ...
 or
 $ pig -x mapreduce ...
-</source>
-
-<p>This example shows how to run Pig in local and mapreduce mode using the java command.</p>
-<source>
-/* local mode */
-$ java -cp pig.jar org.apache.pig.Main -x local ...
-
 
-/* mapreduce mode */
-$ java -cp pig.jar org.apache.pig.Main ...
-or
-$ java -cp pig.jar org.apache.pig.Main -x mapreduce ...
+/* Tez mode */
+$ pig -x tez ...
 </source>
 
 </section>
@@ -185,6 +183,13 @@ $ pig -x local
 grunt> 
 </source>
 
+<p><strong>Tez Local Mode</strong></p>
+<source>
+$ pig -x tez_local
+... - Connecting to ...
+grunt> 
+</source>
+
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig -x mapreduce
@@ -197,6 +202,13 @@ $ pig 
 ... - Connecting to ...
 grunt> 
 </source>
+
+<p><strong>Tez Mode</strong> </p>
+<source>
+$ pig -x tez
+... - Connecting to ...
+grunt> 
+</source>
 </section>
 </section>
 
@@ -222,12 +234,20 @@ store B into ‘id.out’;  -- wri
 <source>
 $ pig -x local id.pig
 </source>
+<p><strong>Tez Local Mode</strong></p>
+<source>
+$ pig -x tez_local id.pig
+</source>
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig id.pig
 or
 $ pig -x mapreduce id.pig
 </source>
+<p><strong>Tez Mode</strong> </p>
+<source>
+$ pig -x tez id.pig
+</source>
 </section>
 
   <!-- ==================================================================== -->
@@ -424,7 +444,7 @@ However, in a production environment you
 <p id="pig-properties">To specify Pig properties use one of these mechanisms:</p>
 <ul>
 	<li>The pig.properties file (add the directory that contains the pig.properties file to the classpath)</li>
-	<li>The -D command line option and a Pig property (pig -Dpig.tmpfilecompression=true)</li>
+	<li>The -D and a Pig property in PIG_OPTS environment variable (export PIG_OPTS=-Dpig.tmpfilecompression=true)</li>
 	<li>The -P command line option and a properties file (pig -P mypig.properties)</li>
 	<li>The <a href="cmds.html#set">set</a> command (set pig.exec.nocombiner true)</li>
 </ul>
@@ -434,7 +454,7 @@ However, in a production environment you
 <p id="hadoop-properties">To specify Hadoop properties you can use the same mechanisms:</p>
 <ul>
 	<li>Hadoop configuration files (include pig-cluster-hadoop-site.xml)</li>
-	<li>The -D command line option and a Hadoop property (pig –Dmapreduce.task.profile=true) </li>
+	<li>The -D and a Hadoop property in PIG_OPTS environment variable (export PIG_OPTS=–Dmapreduce.task.profile=true) </li>
 	<li>The -P command line option and a property file (pig -P property_file)</li>
 	<li>The <a href="cmds.html#set">set</a> command (set mapred.map.tasks.speculative.execution false)</li>
 </ul>
@@ -450,7 +470,7 @@ However, in a production environment you
   <section id="tutorial">
 <title>Pig Tutorial </title>
 
-<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode and mapreduce mode (see <a href="#execution-modes">Execution Modes</a>).</p>
+<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode and Tez mode (see <a href="#execution-modes">Execution Modes</a>).</p>
 
 <p>To get started, do the following preliminary tasks:</p>
 
@@ -458,22 +478,16 @@ However, in a production environment you
 <li>Make sure the JAVA_HOME environment variable is set the root of your Java installation.</li>
 <li>Make sure your PATH includes bin/pig (this enables you to run the tutorials using the "pig" command). 
 <source>
-$ export PATH=/&lt;my-path-to-pig&gt;/pig-0.9.0/bin:$PATH 
+$ export PATH=/&lt;my-path-to-pig&gt;/pig-0.14.0/bin:$PATH 
 </source>
 </li>
 <li>Set the PIG_HOME environment variable:
 <source>
-$ export PIG_HOME=/&lt;my-path-to-pig&gt;/pig-0.9.0 
+$ export PIG_HOME=/&lt;my-path-to-pig&gt;/pig-0.14.0 
 </source></li>
 <li>Create the pigtutorial.tar.gz file:
 <ul>
-    <li>Move to the Pig tutorial directory (.../pig-0.9.0/tutorial).</li>
-	<li>Edit the build.xml file in the tutorial directory. 
-<source>
-Change this:   &lt;property name="pigjar" value="../pig.jar" /&gt;
-To this:       &lt;property name="pigjar" value="../pig-0.9.0-core.jar" /&gt;
-</source>
-	</li>
+    <li>Move to the Pig tutorial directory (.../pig-0.14.0/tutorial).</li>
 	<li>Run the "ant" command from the tutorial directory. This will create the pigtutorial.tar.gz file.
 	</li>
 </ul>
@@ -503,8 +517,12 @@ $ tar -xzf pigtutorial.tar.gz
 <source>
 $ pig -x local script1-local.pig
 </source>
+Or if you are using Tez local mode:
+<source>
+$ pig -x tez_local script1-local.pig
+</source>
 </li>
-<li>Review the result files, located in the part-r-00000 directory.
+<li>Review the result files, located in the script1-local-results.txt directory.
 <p>The output may contain a few Hadoop warnings which can be ignored:</p>
 <source>
 2010-04-08 12:55:33,642 [main] INFO  org.apache.hadoop.metrics.jvm.JvmMetrics 
@@ -516,7 +534,7 @@ $ pig -x local script1-local.pig
 
  <!-- ++++++++++++++++++++++++++++++++++ --> 
 <section>
-<title> Running the Pig Scripts in Mapreduce Mode</title>
+<title> Running the Pig Scripts in Mapreduce Mode or Tez Mode</title>
 
 <p>To run the Pig scripts in mapreduce mode, do the following: </p>
 <ol>
@@ -531,6 +549,10 @@ $ hadoop fs –copyFromLocal excite.l
 <source>
 export PIG_CLASSPATH=/mycluster/conf
 </source>
+<p>If you are using Tez, you will also need to put Tez configuration directory (the directory that contains the tez-site.xml):</p>
+<source>
+export PIG_CLASSPATH=/mycluster/conf:/tez/conf
+</source>
 <p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 3rd party dependencies or resource files a pig script may require. If there is also a need to make the added entries take the highest precedence in the Pig JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to any value, such as 'true' (and unset the env-var to disable).</p></li>
 <li>Set the HADOOP_CONF_DIR environment variable to the location of the cluster configuration directory:
 <source>
@@ -541,6 +563,10 @@ export HADOOP_CONF_DIR=/mycluster/conf
 <source>
 $ pig script1-hadoop.pig
 </source>
+Or if you are using Tez:
+<source>
+$ pig -x tez script1-hadoop.pig
+</source>
 </li>
 
 <li>Review the result files, located in the script1-hadoop-results or script2-hadoop-results HDFS directory:

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml Thu Nov 27 12:49:54 2014
@@ -32,6 +32,6 @@
   -->
   <tab label="Project" href="http://hadoop.apache.org/pig/" type="visible" /> 
   <tab label="Wiki" href="http://wiki.apache.org/pig/" type="visible" /> 
-  <tab label="Pig 0.12.0 Documentation" dir="" type="visible" /> 
+  <tab label="Pig 0.14.0 Documentation" dir="" type="visible" /> 
 
 </tabs>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml Thu Nov 27 12:49:54 2014
@@ -335,6 +335,21 @@ Local Rearrange[tuple]{chararray}(false)
 |   Project[chararray][0] - xxx-Fri Dec 05 19:42:29 UTC 2008-35
  <em>etc ... </em> 
 
+If you are running in Tez mode, Map Reduce Plan will be replaced with Tez Plan:
+
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: PigLatin:185.pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-21	->	Tez vertex scope-22,
+Tez vertex scope-22
+
+Tez vertex scope-21
+# Plan on vertex
+B: Local Rearrange[tuple]{chararray}(false) - scope-35	->	 scope-22
+ <em>etc ... </em> 
 </source> 
  </section></section>
   
@@ -505,7 +520,7 @@ grunt> illustrate -script visits.pig
 <!-- =========================================================================== -->
 <!-- DIAGNOSTIC OPERATORS -->    
 <section id="mapreduce-job-ids">
-<title>Pig Scripts and MapReduce Job IDs</title>
+<title>Pig Scripts and MapReduce Job IDs (MapReduce mode only)</title>
    <p>Complex Pig scripts often generate many MapReduce jobs. To help you debug a script, Pig prints a summary of the execution that shows which relations (aliases) are mapped to each MapReduce job. </p>
 <source>
 JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime 
@@ -533,12 +548,17 @@ job_201004271216_12714 1 1 3 3 3 12 12 1
 
 <p>Several new public classes make it easier for external tools such as Oozie to integrate with Pig statistics. </p>
 
-<p>The Pig statistics are available here: <a href="http://pig.apache.org/docs/r0.9.0/api/">http://pig.apache.org/docs/r0.9.0/api/</a></p>
+<p>The Pig statistics are available here: <a href="http://pig.apache.org/docs/r0.14.0/api/">http://pig.apache.org/docs/r0.14.0/api/</a></p>
 
 <p id="stats-classes">The stats classes are in the package: org.apache.pig.tools.pigstats</p>
 <ul>
 <li>PigStats</li>
+<li>SimplePigStats</li>
+<li>EmbeddedPigStats</li>
 <li>JobStats</li>
+<li>TezPigScriptStats</li>
+<li>TezDAGStats</li>
+<li>TezVertexStats</li>
 <li>OutputStats</li>
 <li>InputStats</li>
 </ul>
@@ -572,6 +592,8 @@ public interface PigProgressNotification
     public void launchCompletedNotification(int numJobsSucceeded);
 }
 </source>
+<p>Depends on the type of the pig script, PigRunner.run() returns a particular subclass of PigStats: SimplePigStats(MapReduce/local mode), TezPigScriptStats(Tez/Tez local mode) or EmbeddedPigStats(embedded script). SimplePigStats contains a map of JobStats which capture the stats for each MapReduce job of the Pig script. TezPigScriptStats contains a map of TezDAGStats which capture the stats for each Tez DAG of the Pig script, and TezDAGStats contains a map of TezVertexStats which capture the stats for each vertex within the Tez DAG. Depending on the execution type, EmbeddedPigStats contains a map of SimplePigStats or TezPigScriptStats, which captures the Pig job launched in the embeded script. </p>
+<p>If one is running Pig in Tez mode (or both Tez/MapReduce mode), should pass PigTezProgressNotificationListener which extends PigProgressNotificationListener to PigRunner.run() to make sure to get notification in both Tez mode or MapReduce mode. </p>
 </section>
 
 <!-- +++++++++++++++++++++++++++++++++++++++ -->
@@ -828,7 +850,7 @@ $pig_trunk ant pigunit-jar   
 
 <!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
-      <title>Mapreduce Mode</title>
+      <title>Other Modes</title>
       <p>PigUnit also runs in Pig's mapreduce/tez/tez_local mode. Mapreduce/Tez mode requires you to use a Hadoop cluster and HDFS installation.
         It is enabled when the Java system property pigunit.exectype is set to specific values (mr/tez/tez_local): e.g. -Dpigunit.exectype=mr or System.getProperties().setProperty("pigunit.exectype", "mr"), which means PigUnit will run in mr mode. The cluster you select to run mr/tez test must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable). 
       </p>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml Thu Nov 27 12:49:54 2014
@@ -71,10 +71,10 @@ DUMP B;
 </source>
 
 <p>The command below can be used to run the script. Note that all examples in this document run in local mode for simplicity 
-but the examples can also run in Hadoop mode. For more information on how to run Pig, please see the PigTutorial. </p>
+but the examples can also run in Tez local/Mapreduce/ Tez mode. For more information on how to run Pig, please see the PigTutorial. </p>
 
 <source>
-java -cp pig.jar org.apache.pig.Main -x local myscript.pig
+pig -x local myscript.pig
 </source>
 
 <p>The first line of the script provides the location of the <code>jar&nbsp;file</code> that contains the UDF. 
@@ -441,6 +441,38 @@ Java Class
 </tr>
 <tr>
 <td>
+<p> boolean </p>
+</td>
+<td>
+<p> Boolean </p>
+</td>
+</tr>
+<tr>
+<td>
+<p> datetime </p>
+</td>
+<td>
+<p> DateTime </p>
+</td>
+</tr>
+<tr>
+<td>
+<p> bigdecimal </p>
+</td>
+<td>
+<p> BigDecimal </p>
+</td>
+</tr>
+<tr>
+<td>
+<p> biginteger </p>
+</td>
+<td>
+<p> BigInteger </p>
+</td>
+</tr>
+<tr>
+<td>
 <p> tuple </p>
 </td>
 <td>
@@ -583,8 +615,11 @@ public class DataType {
     public static final byte LONG      =  15;
     public static final byte FLOAT     =  20;
     public static final byte DOUBLE    =  25;
+    public static final byte DATETIME  =  30;
     public static final byte BYTEARRAY =  50;
     public static final byte CHARARRAY =  55;
+    public static final byte BIGINTEGER =  65;
+    public static final byte BIGDECIMAL =  70;
     public static final byte MAP       = 100;
     public static final byte TUPLE     = 110;
     public static final byte BAG       = 120;
@@ -820,6 +855,50 @@ public SchemaType getSchemaType() {
 <p>For an example see <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/CONCAT.java?view=markup">CONCAT</a>.</p>
 </section>
 
+<section id="counters">
+<title>Using Counters</title>
+<p>Hadoop counters are easily accessible within EvalFunc by using PigStatusReporter object. Here is one example:</p>
+<source>
+public class UPPER extends EvalFunc&lt;String&gt;
+{
+        public String exec(Tuple input) throws IOException {
+                if (input == null || input.size() == 0) {
+                    PigStatusReporter reporter = PigStatusReporter.getInstance();
+                    if (reporter != null) {
+                       reporter.incrCounter(PigWarning.UDF_WARNING_1, 1);
+                    }
+                    return null;
+                }
+                try{
+                        String str = (String)input.get(0);
+                        return str.toUpperCase();
+                }catch(Exception e){
+                    throw new IOException("Caught exception processing input row ", e);
+                }
+        }
+}
+</source>
+</section>
+<section id="access-input-schema">
+        <title>Access input schema inside EvalFunc</title>
+        <p>Not only inside outputSchema at compile time, input schema is also accessible in exec at runtime. For example:</p>
+<source>
+public class AddSchema extends EvalFunc&lt;String&gt;
+{
+        public String exec(Tuple input) throws IOException {
+                if (input == null || input.size() == 0)
+                    return null;
+                String result = "";
+                for (int i=0;i&lt;input.size();i++) {
+                    result += getInputSchema().getFields().get(i).alias;
+                    result += ":";
+                    result += input.get(i);
+                }
+                return result;
+        }
+}
+</source>
+</section>
 <!-- +++++++++++++++++++++++++++++++++++++++++++++++++ -->
 <section id="reporting-progress">
 <title>Reporting Progress</title>
@@ -849,21 +928,32 @@ public class UPPER extends EvalFunc&lt;S
 <!-- +++++++++++++++++++++++++++++++++++++++++++++++++ -->
 <section id="distributed-cache">
 	<title>Using Distributed Cache</title>
-	<p>Use getCacheFiles, an EvalFunc method, to return a list of HDFS files that need to be shipped to distributed cache. Inside exec method, you can assume that these files already exist in distributed cache. For example:</p>
+	<p>Use getCacheFiles or getShipFiles to return a list of HDFS files or local files that need to be shipped to distributed cache. Inside exec method, you can assume that these files already exist in distributed cache. For example:</p>
 <source>
 public class Udfcachetest extends EvalFunc&lt;String&gt; { 
 
     public String exec(Tuple input) throws IOException { 
-        FileReader fr = new FileReader("./smallfile"); 
-        BufferedReader d = new BufferedReader(fr); 
-        return d.readLine(); 
+        String concatResult = "";
+        FileReader fr = new FileReader("./smallfile1"); 
+        BufferedReader d = new BufferedReader(fr);
+        concatResult +=d.readLine();
+        fr = new FileReader("./smallfile2");
+        d = new BufferedReader(fr);
+        concatResult +=d.readLine();
+        return concatResult;
     } 
 
     public List&lt;String&gt; getCacheFiles() { 
         List&lt;String&gt; list = new ArrayList&lt;String&gt;(1); 
-        list.add("/user/pig/tests/data/small#smallfile"); 
+        list.add("/user/pig/tests/data/small#smallfile1");  // This is hdfs file
         return list; 
     } 
+
+    public List&lt;String&gt; getShipFiles() {
+        List&lt;String&gt; list = new ArrayList&lt;String&gt;(1);
+        list.add("/home/hadoop/pig/smallfile2");  // This local file
+        return list;
+    }
 } 
 
 a = load '1.txt'; 
@@ -871,7 +961,45 @@ b = foreach a generate Udfcachetest(*); 
 dump b;
 </source>
 </section>
-
+<section id="compile-time-eval">
+        <title>Compile time evaluation</title>
+        <p>If the parameters of the EvalFunc are all constants, Pig could evaluate the result at compile time. The benefit of evaluating at compile time is performance optimization, and enable certain other optimizations at front end (such as partition pruning, which only allow constant not UDF in filter condition). By default, compile time evaluation is disabled in EvalFunc to prevent potential side effect. To enable it, override allowCompileTimeCalculation. For example:</p>
+<source>
+public class CurrentTime extends EvalFunc&lt;DateTime&gt; {
+    public String exec(Tuple input) throws IOException {
+        return new DateTime();
+    }
+    @Override
+    public boolean allowCompileTimeCalculation() {
+        return true;
+    }
+}
+</source>
+</section>
+<section id="tez-jvm-reuse">
+        <title>Clean up static variable in Tez</title>
+        <p>In Tez, jvm could reuse for other tasks. It is important to cleanup static variable to make sure there is no side effect. Here is one example:</p>
+<source>
+public class UPPER extends EvalFunc&lt;String&gt;
+{
+        static boolean initialized = false;
+        static {
+            JVMReuseManager.getInstance().registerForStaticDataCleanup(UPPER.class);
+        }
+        public String exec(Tuple input) throws IOException {
+            if (!initialized) {
+                init();
+                initialized = true;
+            }
+            ......
+        }
+        @StaticDataCleanup
+        public static void staticDataCleanup() {
+            initialized = false;
+        }
+}
+</source>
+</section>
 </section>
 
 <!-- =============================================================== -->
@@ -907,6 +1035,9 @@ has methods to push operations from Pig 
 
 <li id="loadcaster"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadCaster.java?view=markup">LoadCaster</a> 
 has methods to convert byte arrays to specific types. A loader implementation should implement this if casts (implicit or explicit) from DataByteArray fields to other types need to be supported. </li>
+
+<li id="loadpredicatepushdown"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadPredicatePushdown.java?view=markup">LoadPredicatePushdown</a> 
+ has the methods to push predicates to the loader. It is different than LoadMetadata.setPartitionFilter in that loader may load records which does not satisfy the predicates. In other words, predicates is only a hint. Note this interface is still in development and might change in next version. Currently only OrcStorage implements this interface.</li>
 </ul>
 
  <p>The LoadFunc abstract class is the main class to extend for implementing a loader. The methods which need to be overridden are explained below:</p>
@@ -927,6 +1058,10 @@ has methods to convert byte arrays to sp
  <li id="setUdfContextSignature">setUdfContextSignature(): This method will be called by Pig both in the front end and back end to pass a unique signature to the Loader. The signature can be used to store into the UDFContext any information which the Loader needs to store between various method invocations in the front end and back end. A use case is to store RequiredFieldList passed to it in LoadPushDown.pushProjection(RequiredFieldList) for use in the back end before returning tuples in getNext(). The default implementation in LoadFunc has an empty body. This method will be called before other methods. </li>
  
  <li id="relativeToAbsolutePath">relativeToAbsolutePath(): Pig runtime will call this method to allow the Loader to convert a relative load location to an absolute location. The default implementation provided in LoadFunc handles this for FileSystem locations. If the load source is something else, loader implementation may choose to override this.</li>
+
+ <li id="getCacheFiles">getCacheFiles(): Return a list of hdfs files to ship to distributed cache.</li>
+
+ <li id="getShipFiles">getShipFiles(): Return a list of local files to ship to distributed cache.</li>
  </ul>
 
 <p><strong>Example Implementation</strong></p>
@@ -1055,6 +1190,8 @@ abstract class has the main methods for 
 <ul>
 <li id="storemetadata"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreMetadata.java?view=markup">StoreMetadata:</a> 
 This interface has methods to interact with metadata systems to store schema and store statistics. This interface is optional and should only be implemented if metadata needs to stored. </li>
+<li id="storeresources"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?view=markup">StoreResources:</a> 
+This interface has methods to put hdfs files or local files to distributed cache. </li>
 </ul>
 
 <p id="storefunc-override">The methods which need to be overridden in StoreFunc are explained below: </p>

Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Thu Nov 27 12:49:54 2014
@@ -276,7 +276,7 @@ public abstract class EvalFunc<T>  {
     }
 
     /**
-     * Allow a UDF to specify a list of files it would like placed in the distributed
+     * Allow a UDF to specify a list of hdfs files it would like placed in the distributed
      * cache.  These files will be put in the cache for every job the UDF is used in.
      * The default implementation returns null.
      * @return A list of files
@@ -285,6 +285,17 @@ public abstract class EvalFunc<T>  {
         return null;
     }
 
+    /**
+     * Allow a UDF to specify a list of local files it would like placed in the distributed
+     * cache. These files will be put in the cache for every job the UDF is used in. Check for
+     * {@link FuncUtils} for utility function to facilitate it
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getShipFiles() {
+        return null;
+    }
+
     public PigLogger getPigLogger() {
         return pigLogger;
     }

Modified: pig/branches/spark/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadFunc.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/LoadFunc.java Thu Nov 27 12:49:54 2014
@@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
-
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
@@ -305,4 +304,24 @@ public abstract class LoadFunc {
     public final void warn(String msg, Enum warningEnum) {
         PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
+
+    /**
+     * Allow a LoadFunc to specify a list of files it would like placed in the distributed 
+     * cache.
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getCacheFiles() {
+        return null;
+    }
+
+    /**
+     * Allow a LoadFunc to specify a list of files located locally and would like to ship to backend 
+     * (through distributed cache). Check for {@link FuncUtils} for utility function to facilitate it
+     * The default implementation returns null.
+     * @return A list of files
+     */
+    public List<String> getShipFiles() {
+        return null;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/LoadPredicatePushdown.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/LoadPredicatePushdown.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/LoadPredicatePushdown.java (original)
+++ pig/branches/spark/src/org/apache/pig/LoadPredicatePushdown.java Thu Nov 27 12:49:54 2014
@@ -28,9 +28,14 @@ import org.apache.pig.classification.Int
  * This interface defines how a loader can support predicate pushdown.
  * If a given loader implements this interface, pig will pushdown predicates based on
  * type of operations supported by the loader on given set of fields.
+ *
+ * This interface is private in Pig 0.14 and will be made public in Pig 0.15 after PIG-4093.
+ * It is to be used only by builtin LoadFunc implementations till it is made public
+ * as PIG-4093 will cause API changes to this interface and make it backward incompatible.
+ *
  * @since Pig 0.14
  */
-@InterfaceAudience.Public
+@InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface LoadPredicatePushdown {
     /**

Modified: pig/branches/spark/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/Main.java (original)
+++ pig/branches/spark/src/org/apache/pig/Main.java Thu Nov 27 12:49:54 2014
@@ -51,6 +51,7 @@ import org.antlr.runtime.RecognitionExce
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
@@ -79,6 +80,11 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.timer.PerformanceTimerFactory;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.joda.time.PeriodType;
+import org.joda.time.format.PeriodFormat;
 
 /**
  * Main class for Pig engine.
@@ -165,11 +171,14 @@ public class Main {
     }
 
     static int run(String args[], PigProgressNotificationListener listener) {
+        DateTime startTime = new DateTime();
         int rc = 1;
         boolean verbose = false;
         boolean gruntCalled = false;
         boolean deleteTempFiles = true;
         String logFileName = null;
+        boolean printScriptRunTime = true;
+        PigContext pigContext = null;
 
         try {
             Configuration conf = new Configuration(false);
@@ -284,6 +293,7 @@ public class Main {
                     return ReturnCode.SUCCESS;
 
                 case 'i':
+                    printScriptRunTime = false;
                     System.out.println(getVersionString());
                     return ReturnCode.SUCCESS;
 
@@ -306,11 +316,11 @@ public class Main {
 
                 case 'M':
                     // turns off multiquery optimization
-                    properties.setProperty(PigConfiguration.OPT_MULTIQUERY,""+false);
+                    properties.setProperty(PigConfiguration.PIG_OPT_MULTIQUERY,""+false);
                     break;
 
                 case 'N':
-                    properties.setProperty(PigConfiguration.OPT_FETCH,""+false);
+                    properties.setProperty(PigConfiguration.PIG_OPT_FETCH,""+false);
                     break;
 
                 case 'p':
@@ -338,6 +348,9 @@ public class Main {
 
                 case 'x':
                     properties.setProperty("exectype", opts.getValStr());
+                    if (opts.getValStr().toLowerCase().contains("local")) {
+                        UserGroupInformation.setConfiguration(new Configuration(false));
+                    }
                     break;
 
                 case 'P':
@@ -368,7 +381,7 @@ public class Main {
             }
 
             // create the context with the parameter
-            PigContext pigContext = new PigContext(properties);
+            pigContext = new PigContext(properties);
 
             // create the static script state object
             ScriptState scriptState = pigContext.getExecutionEngine().instantiateScriptState();
@@ -650,16 +663,31 @@ public class Main {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
         } finally {
+            if (printScriptRunTime) {
+                printScriptRunTime(startTime);
+            }
             if (deleteTempFiles) {
                 // clear temp files
                 FileLocalizer.deleteTempFiles();
             }
+            if (pigContext != null) {
+                pigContext.getExecutionEngine().destroy();
+            }
             PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
         }
 
         return rc;
     }
 
+    private static void printScriptRunTime(DateTime startTime) {
+        DateTime endTime = new DateTime();
+        Duration duration = new Duration(startTime, endTime);
+        Period period = duration.toPeriod().normalizedStandard(PeriodType.time());
+        log.info("Pig script completed in "
+                + PeriodFormat.getDefault().print(period)
+                + " (" + duration.getMillis() + " ms)");
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties properties) {
 
         try {
@@ -871,7 +899,7 @@ public class Main {
             System.out.println("        All optimizations listed here are enabled by default. Optimization values are case insensitive.");
             System.out.println("    -v, -verbose - Print all error messages to screen");
             System.out.println("    -w, -warning - Turn warning logging on; also turns warning aggregation off");
-            System.out.println("    -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
+            System.out.println("    -x, -exectype - Set execution mode: local|mapreduce|tez, default is mapreduce.");
             System.out.println("    -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
             System.out.println("    -M, -no_multiquery - Turn multiquery optimization off; default is on");
             System.out.println("    -N, -no_fetch - Turn fetch optimization off; default is on");
@@ -912,8 +940,8 @@ public class Main {
             System.out.println("            If the in-map partial aggregation does not reduce the output num records");
             System.out.println("            by this factor, it gets disabled.");
             System.out.println("    Miscellaneous:");
-            System.out.println("        exectype=mapreduce|local; default is mapreduce. This property is the same as -x switch");
-            System.out.println("        pig.additional.jars=<colon seperated list of jars>. Used in place of register command.");
+            System.out.println("        exectype=mapreduce|tez|local; default is mapreduce. This property is the same as -x switch");
+            System.out.println("        pig.additional.jars.uris=<comma seperated list of jars>. Used in place of register command.");
             System.out.println("        udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.");
             System.out.println("        stop.on.failure=true|false; default is false. Set to true to terminate on the first error.");
             System.out.println("        pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.");

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Thu Nov 27 12:49:54 2014
@@ -29,84 +29,123 @@ public class PigConfiguration {
     /////////////////////////       COMMAND LINE KEYS       /////////////////////////////
     /////////////////////////////////////////////////////////////////////////////////////
 
+    // Pig runtime optimizations
     /**
-     * Controls the fraction of total memory that is allowed to be used by
-     * cached bags. Default is 0.2.
+     * This key is to turn on auto local mode feature
      */
-    public static final String PROP_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
-
+    public static final String PIG_AUTO_LOCAL_ENABLED = "pig.auto.local.enabled";
     /**
-     * Controls whether partial aggregation is turned on
+     * Controls the max threshold size to convert jobs to run in local mode
      */
-    public static final String PROP_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+    public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
 
     /**
-     * Controls the minimum reduction in-mapper Partial Aggregation should achieve in order
-     * to stay on. If after a period of observation this reduction is not achieved,
-     * in-mapper aggregation will be turned off and a message logged to that effect.
+     * Boolean value used to enable or disable fetching without a mapreduce job for DUMP. True by default
      */
-    public static final String PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
+    public static final String PIG_OPT_FETCH = "opt.fetch";
 
+    // Pig query planning and execution optimizations
     /**
-     * Controls whether execution time of Pig UDFs should be tracked.
-     * This feature uses counters; use judiciously.
+     * Boolean value used to enable or disable multiquery optimization. True by default
      */
-    public static final String TIME_UDFS = "pig.udf.profile";
-    public static final String TIME_UDFS_FREQUENCY = "pig.udf.profile.frequency";
+    public static final String PIG_OPT_MULTIQUERY = "opt.multiquery";
 
     /**
-     * This key must be set to true by the user for code generation to be used.
-     * In the future, it may be turned on by default (at least in certain cases),
-     * but for now it is too experimental.
+     * Boolean value used to enable or disable accumulator optimization. True by default
      */
-    public static final String SHOULD_USE_SCHEMA_TUPLE = "pig.schematuple";
-
-    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_UDF = "pig.schematuple.udf";
-
-    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH = "pig.schematuple.foreach";
+    public static final String PIG_OPT_ACCUMULATOR = "opt.accumulator";
+    public static final String PIG_ACCUMULATIVE_BATCHSIZE = "pig.accumulative.batchsize";
 
-    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN = "pig.schematuple.fr_join";
+    /**
+     * This key is used to enable or disable union optimization in tez. True by default
+     */
+    public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
 
-    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN = "pig.schematuple.merge_join";
+    /**
+     * Boolean value to enable or disable partial aggregation in map. Disabled by default
+     */
+    public static final String PIG_EXEC_MAP_PARTAGG = "pig.exec.mapPartAgg";
+    /**
+     * Controls the minimum reduction in-mapper Partial Aggregation should achieve in order
+     * to stay on. If after a period of observation this reduction is not achieved,
+     * in-mapper aggregation will be turned off and a message logged to that effect.
+     */
+    public static final String PIG_EXEC_MAP_PARTAGG_MINREDUCTION = "pig.exec.mapPartAgg.minReduction";
 
-    public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = "pig.schematuple.force";
+    /**
+     * Boolean value to enable or disable use of combiners in MapReduce jobs. Enabled by default
+     */
+    public static final String PIG_EXEC_NO_COMBINER = "pig.exec.nocombiner";
 
     /**
-     * This key is used to enable multiquery optimization.
+     * This key controls whether secondary sort key is used for optimization in case
+     * of nested distinct or sort
      */
-    public static final String OPT_MULTIQUERY = "opt.multiquery";
+    public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
 
+    // Pig memory usage control settings
     /**
-     * This key is used to enable accumulator optimization.
+     * Controls the fraction of total memory that is allowed to be used by
+     * cached bags. Default is 0.2.
      */
-    public static final String OPT_ACCUMULATOR = "opt.accumulator";
+    public static final String PIG_CACHEDBAG_MEMUSAGE = "pig.cachedbag.memusage";
 
+    /**
+     * % of memory available for the input data. This is currently equal to the
+     * memory available for the skewed join
+     */
+    public static final String PIG_SKEWEDJOIN_REDUCE_MEMUSAGE = "pig.skewedjoin.reduce.memusage";
 
     /**
-     * This key is used to configure auto parallelism in tez. Default is true.
+     * This key used to control the maximum size loaded into
+     * the distributed cache when doing fragment-replicated join
      */
-    public static final String TEZ_AUTO_PARALLELISM = "pig.tez.auto.parallelism";
+    public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
 
+    // Pig cached bag type settings
     /**
-     * This key is used to enable union optimization.
+     * Configurations for specifying alternate implementations for cached bags. Rarely used
      */
-    public static final String TEZ_OPT_UNION = "pig.tez.opt.union";
+    public static final String PIG_CACHEDBAG_TYPE = "pig.cachedbag.type";
+    public static final String PIG_CACHEDBAG_DISTINCT_TYPE = "pig.cachedbag.distinct.type";
+    public static final String PIG_CACHEDBAG_SORT_TYPE = "pig.cachedbag.sort.type";
 
+    // Pig reducer parallelism estimation settings
+    public static final String PIG_EXEC_REDUCER_ESTIMATOR = "pig.exec.reducer.estimator";
+    public static final String PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
     /**
-     * This key is used to define whether to reuse AM in Tez jobs.
+     * This key is used to configure auto parallelism in tez. Default is true.
      */
-    public static final String TEZ_SESSION_REUSE = "pig.tez.session.reuse";
+    public static final String PIG_TEZ_AUTO_PARALLELISM = "pig.tez.auto.parallelism";
 
+    // Pig UDF profiling settings
     /**
-     * This key is used to configure the interval of dag status report in seconds.
+     * Controls whether execution time of Pig UDFs should be tracked.
+     * This feature uses counters; use judiciously.
      */
-    public static final String TEZ_DAG_STATUS_REPORT_INTERVAL = "pig.tez.dag.status.report.interval";
+    public static final String PIG_UDF_PROFILE = "pig.udf.profile";
+    public static final String PIG_UDF_PROFILE_FREQUENCY = "pig.udf.profile.frequency";
 
+    // Pig schema tuple settings
     /**
-     * Turns off use of combiners in MapReduce jobs produced by Pig.
+     * This key must be set to true by the user for code generation to be used.
+     * In the future, it may be turned on by default (at least in certain cases),
+     * but for now it is too experimental.
      */
-    public static final String PROP_NO_COMBINER = "pig.exec.nocombiner";
+    public static final String PIG_SCHEMA_TUPLE_ENABLED = "pig.schematuple";
+
+    public static final String PIG_SCHEMA_TUPLE_USE_IN_UDF = "pig.schematuple.udf";
+
+    public static final String PIG_SCHEMA_TUPLE_USE_IN_FOREACH = "pig.schematuple.foreach";
 
+    public static final String PIG_SCHEMA_TUPLE_USE_IN_FRJOIN = "pig.schematuple.fr_join";
+
+    public static final String PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN = "pig.schematuple.merge_join";
+
+    public static final String PIG_SCHEMA_TUPLE_ALLOW_FORCE = "pig.schematuple.force";
+
+
+    // Pig Streaming settings
     /**
      * This key can be used to defined what properties will be set in the streaming environment.
      * Just set this property to a comma-delimited list of properties to set, and those properties
@@ -120,33 +159,41 @@ public class PigConfiguration {
      */
     public static final String PIG_STREAMING_UDF_PYTHON_COMMAND = "pig.streaming.udf.python.command";
 
+    // Pig input format settings
     /**
-     * This key is used to define the default load func. Pig will fallback on PigStorage
-     * as default in case this is undefined.
+     * Turns combine split files on or off
      */
-    public static final String PIG_DEFAULT_LOAD_FUNC = "pig.default.load.func";
+    public static final String PIG_SPLIT_COMBINATION = "pig.splitCombination";
 
     /**
-     * This key is used to define the default store func. Pig will fallback on PigStorage
-     * as default in case this is undefined.
+     * Whether turns combine split files off. This is for internal use only
      */
-    public static final String PIG_DEFAULT_STORE_FUNC = "pig.default.store.func";
+    public static final String PIG_NO_SPLIT_COMBINATION = "pig.noSplitCombination";
+
+    /**
+     * Specifies the size, in bytes, of data to be processed by a single map.
+     * Smaller files are combined untill this size is reached.
+     */
+    public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
 
+    // Pig output format settings
+    /**
+     * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
+     * so that jobs won't write empty part files if no output is generated
+     */
+    public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
     /**
      * This key is used to define whether to support recovery to handle the
      * application master getting restarted.
      */
     public static final String PIG_OUTPUT_COMMITTER_RECOVERY = "pig.output.committer.recovery.support";
 
-    /**
-     * This key is used to turn off the inclusion of settings in the jobs.
-     */
-    public static final String INSERT_ENABLED = "pig.script.info.enabled";
+    //Pig intermediate temporary file settings
 
     /**
-     * Controls the size of Pig script stored in job xml.
+     * Location where pig stores temporary files for job setup
      */
-    public static final String MAX_SCRIPT_SIZE = "pig.script.max.size";
+    public static final String PIG_TEMP_DIR = "pig.temp.dir";
 
     /**
      * This key is used to define whether to have intermediate file compressed
@@ -171,44 +218,12 @@ public class PigConfiguration {
      */
     public static final String PIG_DELETE_TEMP_FILE = "pig.delete.temp.files";
 
-    /**
-     * For a given mean and a confidence, a sample rate is obtained from a poisson udf
-     */
-    public static final String SAMPLE_RATE = "pig.sksampler.samplerate";
 
+    //Pig skewedjoin and order by sampling settings
     /**
-     * % of memory available for the input data. This is currently equal to the
-     * memory available for the skewed join
-     */
-    public static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
-
-    /**
-     * This key used to control the maximum size loaded into
-     * the distributed cache when doing fragment-replicated join
-     */
-    public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
-
-    /**
-     * Turns combine split files on or off
-     */
-    public static final String PIG_SPLIT_COMBINATION = "pig.splitCombination";
-
-    /**
-     * Whether turns combine split files off. This is for internal use only
-     */
-    public static final String PIG_NO_SPLIT_COMBINATION = "pig.noSplitCombination";
-
-    /**
-     * Specifies the size, in bytes, of data to be processed by a single map.
-     * Smaller files are combined untill this size is reached.
-     */
-    public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
-
-    /**
-     * This key controls whether secondary sort key is used for optimization in case
-     * of nested distinct or sort
+     * For a given mean and a confidence, a sample rate is obtained from a poisson udf
      */
-    public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
+    public static final String PIG_POISSON_SAMPLER_SAMPLE_RATE = "pig.sksampler.samplerate";
 
     /**
      * This key used to control the sample size of RandomeSampleLoader for
@@ -216,31 +231,29 @@ public class PigConfiguration {
      */
     public static final String PIG_RANDOM_SAMPLER_SAMPLE_SIZE = "pig.random.sampler.sample.size";
 
-    /**
-     * This key is to turn on auto local mode feature
-     */
-    public static final String PIG_AUTO_LOCAL_ENABLED = "pig.auto.local.enabled";
 
+    //Pig miscellaneous settings
     /**
-     * Controls the max threshold size to convert jobs to run in local mode
+     * This key is used to define the default load func. Pig will fallback on PigStorage
+     * as default in case this is undefined.
      */
-    public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+    public static final String PIG_DEFAULT_LOAD_FUNC = "pig.default.load.func";
 
     /**
-     * This parameter enables/disables fetching. By default it is turned on.
+     * This key is used to define the default store func. Pig will fallback on PigStorage
+     * as default in case this is undefined.
      */
-    public static final String OPT_FETCH = "opt.fetch";
+    public static final String PIG_DEFAULT_STORE_FUNC = "pig.default.store.func";
 
     /**
-     * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
-     * so that jobs won't write empty part files if no output is generated
+     * This key is used to turn off the inclusion of settings in the jobs.
      */
-    public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
+    public static final String PIG_SCRIPT_INFO_ENABLED = "pig.script.info.enabled";
 
     /**
-     * Location where pig stores temporary files for job setup
+     * Controls the size of Pig script stored in job xml.
      */
-    public static final String PIG_TEMP_DIR = "pig.temp.dir";
+    public static final String PIG_SCRIPT_MAX_SIZE = "pig.script.max.size";
 
     /**
      * This key is turn on the user level cache
@@ -276,9 +289,86 @@ public class PigConfiguration {
      * This key is used to turns off use of task reports in job statistics.
      */
     public static final String PIG_NO_TASK_REPORT = "pig.stats.notaskreport";
-    
-    public static final String PIG_CROSS_PARALLELISM_HINT = "pig.cross.parallelism.hint";
 
-    public static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
-    public static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
+    /**
+     * The timezone to be used by Pig datetime datatype
+     */
+    public static final String PIG_DATETIME_DEFAULT_TIMEZONE = "pig.datetime.default.tz";
+
+
+    // Pig on Tez runtime settings
+    /**
+     * This key is used to define whether to reuse AM in Tez jobs.
+     */
+    public static final String PIG_TEZ_SESSION_REUSE = "pig.tez.session.reuse";
+
+    /**
+     * This key is used to configure the interval of dag status report in seconds. Default is 20
+     */
+    public static final String PIG_TEZ_DAG_STATUS_REPORT_INTERVAL = "pig.tez.dag.status.report.interval";
+
+
+
+    // Deprecated settings of Pig 0.13
+
+    /**
+     * @deprecated use {@link #PIG_OPT_FETCH} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String OPT_FETCH = PIG_OPT_FETCH;
+
+    /**
+     * @deprecated use {@link #PIG_CACHEDBAG_MEMUSAGE} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String PROP_CACHEDBAG_MEMUSAGE = PIG_CACHEDBAG_MEMUSAGE;
+
+    /**
+     * @deprecated use {@link #PIG_EXEC_MAP_PARTAGG} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String PROP_EXEC_MAP_PARTAGG = PIG_EXEC_MAP_PARTAGG;
+
+    /**
+     * @deprecated use {@link #PIG_EXEC_MAP_PARTAGG_MINREDUCTION} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String PARTAGG_MINREDUCTION = PIG_EXEC_MAP_PARTAGG_MINREDUCTION;
+
+    /**
+     * @deprecated use {@link #PROP_NO_COMBINER1} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String PROP_NO_COMBINER = PIG_EXEC_NO_COMBINER;
+
+    @Deprecated
+    public static final String SHOULD_USE_SCHEMA_TUPLE = PIG_SCHEMA_TUPLE_ENABLED;
+
+    @Deprecated
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_UDF = PIG_SCHEMA_TUPLE_USE_IN_UDF;
+
+    @Deprecated
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH = PIG_SCHEMA_TUPLE_USE_IN_FOREACH;
+
+    @Deprecated
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN = PIG_SCHEMA_TUPLE_USE_IN_FRJOIN;
+
+    @Deprecated
+    public static final String SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN = PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN;
+
+    @Deprecated
+    public static final String SCHEMA_TUPLE_SHOULD_ALLOW_FORCE = PIG_SCHEMA_TUPLE_ALLOW_FORCE;
+
+    /**
+     * @deprecated use {@link #PIG_SCRIPT_INFO_ENABLED} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String INSERT_ENABLED = PIG_SCRIPT_INFO_ENABLED;
+
+    /**
+     * @deprecated use {@link #PIG_SCRIPT_MAX_SIZE} instead. Will be removed in Pig 0.16
+     */
+    @Deprecated
+    public static final String MAX_SCRIPT_SIZE = PIG_SCRIPT_MAX_SIZE;
+
 }

Modified: pig/branches/spark/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConstants.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConstants.java Thu Nov 27 12:49:54 2014
@@ -57,4 +57,6 @@ public class PigConstants {
      */
     public static final String TIME_UDFS_INVOCATION_COUNTER = "approx_invocations";
     public static final String TIME_UDFS_ELAPSED_TIME_COUNTER = "approx_microsecs";
+
+    public static final String TASK_INDEX = "mapreduce.task.index";
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigServer.java Thu Nov 27 12:49:54 2014
@@ -231,24 +231,36 @@ public class PigServer {
         addJarsFromProperties();
         markPredeployedJarsFromProperties();
 
-        PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
-
         if (ScriptState.get() == null) {
             // If Pig was started via command line, ScriptState should have been
             // already initialized in Main. If so, we should not overwrite it.
             ScriptState.start(pigContext.getExecutionEngine().instantiateScriptState());
         }
+        PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
+
     }
 
     private void addJarsFromProperties() throws ExecException {
         //add jars from properties to extraJars
         String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");
+        if (jar_str==null) {
+            jar_str = "";
+        }
+        jar_str = jar_str.replaceAll(File.pathSeparator, ",");
+        if (!jar_str.isEmpty()) {
+            jar_str += ",";
+        }
 
-        if(jar_str != null){
+        String jar_str_comma = pigContext.getProperties().getProperty("pig.additional.jars.uris");
+        if (jar_str_comma!=null && !jar_str_comma.isEmpty()) {
+            jar_str = jar_str + jar_str_comma;
+        }
+
+        if(jar_str != null && !jar_str.isEmpty()){
             // Use File.pathSeparator (":" on Linux, ";" on Windows)
             // to correctly handle path aggregates as they are represented
             // on the Operating System.
-            for(String jar : jar_str.split(File.pathSeparator)){
+            for(String jar : jar_str.split(",")){
                 try {
                     registerJar(jar);
                 } catch (IOException e) {

Modified: pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Thu Nov 27 12:49:54 2014
@@ -193,4 +193,9 @@ public interface ExecutionEngine {
      */
     public void killJob(String jobID) throws BackendException;
 
+    /**
+     * Perform any cleanup operation
+     */
+    public void destroy();
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AccumuloBinaryConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AccumuloBinaryConverter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AccumuloBinaryConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/AccumuloBinaryConverter.java Thu Nov 27 12:49:54 2014
@@ -172,9 +172,12 @@ public class AccumuloBinaryConverter imp
 
         len = len ^ 0x80000000;
 
-        dos.writeInt(len);
-        dos.write(bytes);
-        dos.close();
+        try {
+            dos.writeInt(len);
+            dos.write(bytes);
+        } finally {
+            dos.close();
+        }
 
         return ret;
     }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/accumulo/Utils.java Thu Nov 27 12:49:54 2014
@@ -297,8 +297,6 @@ public class Utils {
             zos.write(arr, 0, read);
             read = is.read(arr);
         }
-        is.close();
-        zos.closeEntry();
     }
 
     public static void jarDir(File dir, String relativePath, ZipOutputStream zos)
@@ -317,7 +315,16 @@ public class Utils {
             zos.closeEntry();
         } else {
             InputStream is = new FileInputStream(manifestFile);
-            copyToZipStream(is, manifestEntry, zos);
+            try {
+                copyToZipStream(is, manifestEntry, zos);
+            } finally {
+                if (is != null) {
+                    is.close();
+                }
+                if (zos != null) {
+                    zos.closeEntry();
+                }
+            }
         }
         zos.closeEntry();
         zipDir(dir, relativePath, zos, true);
@@ -345,7 +352,16 @@ public class Utils {
                     if (!path.equals(JarFile.MANIFEST_NAME)) {
                         ZipEntry anEntry = new ZipEntry(path);
                         InputStream is = new FileInputStream(f);
-                        copyToZipStream(is, anEntry, zos);
+                        try {
+                            copyToZipStream(is, anEntry, zos);
+                        } finally {
+                            if (is != null) {
+                                is.close();
+                            }
+                            if (zos != null) {
+                                zos.closeEntry();
+                            }
+                        }
                     }
                 }
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Thu Nov 27 12:49:54 2014
@@ -79,7 +79,11 @@ public class ConfigurationUtil {
             localConf = new Configuration(false);
             localConf.addResource("core-default.xml");
         } else {
-            localConf = new Configuration(true);
+            if (PigMapReduce.sJobContext!=null) {
+                localConf = PigMapReduce.sJobContext.getConfiguration();
+            } else {
+                localConf = new Configuration(true);
+            }
             // It's really hacky, try to get unit test working under hadoop 23.
             // Hadoop23 MiniMRCluster currently need setup Distributed cache before start,
             // so build/classes/hadoop-site.xml contains such entry. This prevents some tests from

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,7 @@ import java.io.PrintStream;
 import java.net.URL;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
@@ -46,6 +47,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -108,6 +110,23 @@ public abstract class HExecutionEngine i
         init(this.pigContext.getProperties());
     }
 
+    // Loads S3 properties from core-site.xml including aws keys that are needed
+    // for both local and non-local mode.
+    public JobConf getS3Conf() throws ExecException {
+        JobConf jc = new JobConf();
+        jc.addResource(CORE_SITE);
+        Iterator<Entry<String, String>> i = jc.iterator();
+        while (i.hasNext()) {
+            Entry<String, String> e = i.next();
+            String key = e.getKey();
+            String value = e.getValue();
+            if (key.startsWith("fs.s3") || key.startsWith("fs.s3n")) {
+                jc.set(key, value);
+            }
+        }
+        return jc;
+    }
+
     public JobConf getLocalConf() {
         JobConf jc = new JobConf(false);
 
@@ -146,6 +165,7 @@ public abstract class HExecutionEngine i
         return jc;
     }
 
+    @SuppressWarnings("resource")
     private void init(Properties properties) throws ExecException {
         String cluster = null;
         String nameNode = null;
@@ -167,9 +187,10 @@ public abstract class HExecutionEngine i
         // existing properties All of the above is accomplished in the method
         // call below
 
-        JobConf jc = null;
+        JobConf jc = getS3Conf();
         if (!this.pigContext.getExecType().isLocal()) {
-            jc = getExecConf(properties);
+            JobConf execConf = getExecConf(properties);
+            ConfigurationUtil.mergeConf(jc, execConf);
 
             // Trick to invoke static initializer of DistributedFileSystem to
             // add hdfs-default.xml into configuration
@@ -183,7 +204,8 @@ public abstract class HExecutionEngine i
             properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
             properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
 
-            jc = getLocalConf();
+            JobConf localConf = getLocalConf();
+            ConfigurationUtil.mergeConf(jc, localConf);
         }
 
         // the method below alters the properties object by overriding the
@@ -296,7 +318,7 @@ public abstract class HExecutionEngine i
 
         PrintStream pps = ps;
         PrintStream eps = ps;
-
+        boolean isFetchable = false;
         try {
             if (file != null) {
                 pps = new PrintStream(new File(file, "physical_plan-" + suffix));
@@ -307,13 +329,16 @@ public abstract class HExecutionEngine i
             pp.explain(pps, format, verbose);
 
             MapRedUtil.checkLeafIsStore(pp, pigContext);
-            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+            isFetchable = FetchOptimizer.isPlanFetchable(pc, pp);
+            if (isFetchable) {
                 new FetchLauncher(pigContext).explain(pp, pc, eps, format);
                 return;
             }
             launcher.explain(pp, pigContext, eps, format, verbose);
         } finally {
             launcher.reset();
+            if (isFetchable)
+                pigContext.getProperties().remove(PigImplConstants.CONVERTED_TO_FETCH);
             //Only close the stream if we opened it.
             if (file != null) {
                 pps.close();
@@ -352,4 +377,11 @@ public abstract class HExecutionEngine i
         }
     }
 
+    @Override
+    public void destroy() {
+        if (launcher != null) {
+            launcher.destroy();
+        }
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Thu Nov 27 12:49:54 2014
@@ -22,6 +22,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -160,23 +161,25 @@ public abstract class Launcher {
         return (int) (Math.ceil(prog)) == 1;
     }
 
-    protected long computeTimeSpent(TaskReport[] taskReports) {
+    protected long computeTimeSpent(Iterator<TaskReport> taskReports) {
         long timeSpent = 0;
-        for (TaskReport r : taskReports) {
+        while (taskReports.hasNext()) {
+            TaskReport r = taskReports.next();
             timeSpent += (r.getFinishTime() - r.getStartTime());
         }
         return timeSpent;
     }
 
-    protected void getErrorMessages(TaskReport reports[], String type,
+    protected void getErrorMessages(Iterator<TaskReport> reports, String type,
             boolean errNotDbg, PigContext pigContext) throws Exception {
-        for (int i = 0; i < reports.length; i++) {
-            String msgs[] = reports[i].getDiagnostics();
+        while(reports.hasNext()) {
+            TaskReport report = reports.next();
+            String msgs[] = report.getDiagnostics();
             ArrayList<Exception> exceptions = new ArrayList<Exception>();
             String exceptionCreateFailMsg = null;
             boolean jobFailed = false;
             if (msgs.length > 0) {
-                if (HadoopShims.isJobFailed(reports[i])) {
+                if (HadoopShims.isJobFailed(report)) {
                     jobFailed = true;
                 }
                 Set<String> errorMessageSet = new HashSet<String>();
@@ -199,7 +202,7 @@ public abstract class Launcher {
                             }
                         } else {
                             log.debug("Error message from task (" + type + ") "
-                                    + reports[i].getTaskID() + msgs[j]);
+                                    + report.getTaskID() + msgs[j]);
                         }
                     }
                 }
@@ -223,7 +226,7 @@ public abstract class Launcher {
                 if (exceptions.size() > 1) {
                     for (int j = 0; j < exceptions.size(); ++j) {
                         String headerMessage = "Error message from task ("
-                                + type + ") " + reports[i].getTaskID();
+                                + type + ") " + report.getTaskID();
                         LogUtils.writeLog(exceptions.get(j), pigContext
                                 .getProperties().getProperty("pig.logfile"),
                                 log, false, headerMessage, false, false);
@@ -276,14 +279,18 @@ public abstract class Launcher {
     public class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
         @Override
         public void uncaughtException(Thread thread, Throwable throwable) {
-            jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
-            try {
-                jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
-            } catch (Exception e) {
-                String errMsg = "Could not resolve error that occured when launching job: "
-                        + jobControlExceptionStackTrace;
-                jobControlException = new RuntimeException(errMsg, throwable);
-            }
+            setJobException(throwable);
+        }
+    }
+
+    protected void setJobException(Throwable throwable) {
+        jobControlExceptionStackTrace = Utils.getStackStraceStr(throwable);
+        try {
+            jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
+        } catch (Exception e) {
+            String errMsg = "Could not resolve error that occured when launching job: "
+                    + jobControlExceptionStackTrace;
+            jobControlException = new RuntimeException(errMsg, throwable);
         }
     }
 
@@ -634,4 +641,7 @@ public abstract class Launcher {
         return new StackTraceElement(declaringClass, methodName, fileName,
                 lineNumber);
     }
+
+    public void destroy() {
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Thu Nov 27 12:49:54 2014
@@ -25,7 +25,6 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -40,14 +39,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
-import org.joda.time.DateTimeZone;
 
 /**
  * This class is responsible for executing the fetch task, saving the result to disk
@@ -88,6 +88,7 @@ public class FetchLauncher {
         }
         finally {
             UDFContext.getUDFContext().addJobConf(null);
+            pigContext.getProperties().remove(PigImplConstants.CONVERTED_TO_FETCH);
         }
     }
 
@@ -140,11 +141,7 @@ public class FetchLauncher {
         udfContext.serialize(conf);
 
         PigMapReduce.sJobConfInternal.set(conf);
-        String dtzStr = conf.get("pig.datetime.default.tz");
-        if (dtzStr != null && dtzStr.length() > 0) {
-            // ensure that the internal timezone is uniformly in UTC offset style
-            DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
-        }
+        Utils.setDefaultTimeZone(conf);
 
         boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();