You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC

svn commit: r1571454 [2/5] - in /pig/branches/tez: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/ contrib/piggybank/java/src/main/java/...

Modified: pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml Mon Feb 24 21:41:38 2014
@@ -2152,8 +2152,165 @@ STORE measurements INTO 'measurements' U
    AvroStorage. See <a href="#AvroStorage">AvroStorage</a> for a detailed description of the
    arguments for TrevniStorage.</p>
    </section>
+
+    <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+    <section id="AccumuloStorage">
+        <title>AccumuloStorage</title>
+        <p>Loads or stores data from an Accumulo table. The first element in a Tuple is equivalent to the "row"
+            from the Accumulo Key, while the columns in that row are can be grouped in various static or wildcarded
+            ways. Basic wildcarding functionality exists to group various columns families/qualifiers into a Map for
+            LOADs, or serialize a Map into some group of column families or qualifiers on STOREs.
+        </p>
+
+        <section>
+            <title>Syntax</title>
+            <table>
+                <tr>
+                     <td>
+                        <p>AccumuloStorage(['columns'[, 'options']])</p>
+                     </td>
+                  </tr>
+            </table>
+        </section>
+
+        <section>
+            <title>Arguments</title>
+            <table>
+                <tr>
+                    <td>
+                        <p>'columns'</p>
+                    </td>
+                    <td>
+                        <p>A comma-separated list of "columns" to read data from to write data to.
+                           Each of these columns can be considered one of three different types:
+                        </p>
+
+                        <ol>
+                            <li>Literal</li>
+                            <li>Column family prefix</li>
+                            <li>Column qualifier prefix</li>
+                        </ol>
+
+                        <p><strong>Literal:</strong> this is the simplest specification
+                           which is a colon-delimited string that maps to a column family and column
+                           qualifier. This will read/write a simple scalar from/to Accumulo.
+                        </p>
+
+                        <p><strong>Column family prefix:</strong> When reading data, this
+                            will fetch data from Accumulo Key-Values in the current row whose column family match the 
+                            given prefix. This will result in a Map being placed into the Tuple. When writing
+                            data, a Map is also expected at the given offset in the Tuple whose Keys will be 
+                            appended to the column family prefix, an empty column qualifier is used, and the Map
+                            value will be placed in the Accumulo Value. A valid column family prefix is a literal
+                            asterisk (*) in which case the Map Key will be equivalent to the Accumulo column family.
+                        </p>
+                        
+                        <p><strong>Column qualifier prefix:</strong> Similar to the column
+                            family prefix except it operates on the column qualifier. On reads, Accumulo Key-Values
+                            in the same row that match the given column family and column qualifier prefix will be
+                            placed into a single Map. On writes, the provided column family from the column specification
+                            will be used, the Map key will be appended to the column qualifier provided in the specification,
+                            and the Map Value will be the Accumulo Value.
+                        </p>
+
+                        <p>When "columns" is not provided or is a blank String, it is treated equivalently to "*".
+                            This is to say that when a column specification string is not provided, for reads, all columns
+                            in the given Accumulo row will be placed into a single Map (with the Map keys being colon
+                            delimited to preserve the column family/qualifier from Accumulo). For writes, the Map keys
+                            will be placed into the column family and the column qualifier will be empty.
+                        </p>
+                    </td>
+               </tr>
+               <tr>
+                    <td>
+                        <p>'options'</p>
+                    </td>
+                    <td>
+                        <p>A string that contains space-separated options ("optionA valueA -optionB valueB -optionC valueC")</p>
+                        <p>The currently supported options are:</p>
+                        <ul>
+                            <li>(-c|--caster) LoadStoreCasterImpl An implementation of a LoadStoreCaster to use when serializing types into Accumulo,
+                                usually AccumuloBinaryConverter or UTF8StringConverter, defaults to UTF8StorageConverter.
+                            </li>
+                            <li>(-auths|--authorizations) auth1,auth2... A comma-separated list of Accumulo authorizations to use when reading
+                                data from Accumulo. Defaults to the empty set of authorizations (none).
+                            </li>
+                            <li>(-s|--start) start_row The Accumulo row to begin reading from, inclusive</li>
+                            <li>(-e|--end) end_row The Accumulo row to read until, inclusive</li>
+                            <li>(-buff|--mutation-buffer-size) num_bytes The number of bytes to buffer when writing data to Accumulo. A higher
+                                value requires more memory</li>
+                            <li>(-wt|--write-threads) num_threads The number of threads used to write data to Accumulo.</li>
+                            <li>(-ml|--max-latency) milliseconds Maximum time in milliseconds before data is flushed to Accumulo.</li>
+                            <li>(-sep|--separator) str The separator character used when parsing the column specification, defaults to comma (,)</li>
+                            <li>(-iw|--ignore-whitespace) (true|false) Should whitespace be stripped from the column specification, defaults to true</li>
+                       </ul>
+                    </td>
+               </tr>
+            </table>
+        </section>
+
+        <section>
+            <title>Usage</title>
+   
+            <p>AccumuloStorage has the functionality to store or fetch data from Accumulo. Its goal is to provide
+                a simple, widely applicable table schema compatible with Pig's API. Each Tuple contains some subset
+                of the columns stored within one row of the Accumulo table, which depends on the columns provided
+                as an argument to the function. If '*' is provided, all columns in the table will be returned. The
+                second argument provides control over a variety of options that can be used to change various properties.</p>
+            <p>When invoking Pig Scripts that use AccumuloStorage, it's important to ensure that Pig has the Accumulo
+                jars on its classpath. This is easily achieved using the ACCUMULO_HOME environment variable.
+            </p>
+<source>
+PIG_CLASSPATH="$ACCUMULO_HOME/lib/*:$PIG_CLASSPATH" pig my_script.pig
+</source>
+        </section>
+
+        <section>
+            <title>Load Example</title>
+            <p>It is simple to fetch all columns from Airport codes that fall between Boston and San Francisco
+                that can be viewed with 'auth1' and/or 'auth2' Accumulo authorizations.</p>
+<source>
+raw = LOAD 'accumulo://airports?instance=accumulo&amp;user=root&amp;password=passwd&amp;zookeepers=localhost'
+      USING org.apache.pig.backend.hadoop.accumulo.AccumuloStorage(
+      '*', '-a auth1,auth2 -s BOS -e SFO') AS
+      (code:chararray, all_columns:map[]);
+</source>
+            <p>The datatypes of the columns are declared with the "AS" clause. In this example, the row key,
+                which is the unique airport code is assigned to the "code" variable while all of the other
+                columns are placed into the map. When there is a non-empty column qualifier, the key in that
+                map will have a colon which separates which portion of the key came from the column family and
+                which portion came from the column qualifier. The Accumulo value is placed in the Map value.</p>
+                
+            <p>Most times, it is not necessary, nor desired for performance reasons, to fetch all columns.</p>
+<source>
+raw = LOAD 'accumulo://airports?instance=accumulo&amp;user=root&amp;password=passwd&amp;zookeepers=localhost'
+      USING org.apache.pig.backend.hadoop.accumulo.AccumuloStorage(
+      'name,building:num_terminals,carrier*,reviews:transportation*') AS
+      (code:chararray name:bytearray carrier_map:map[] transportion_reviews_map:map[]);
+</source>      
+            <p>An asterisk can be used when requesting columns to group a collection of columns into a single 
+                Map instead of enumerating each column.</p>
+        </section>
+
+        <section>
+            <title>Store Example</title>
+            <p>Data can be easily stored into Accumulo.</p>
+<source>
+A = LOAD 'flights.txt' AS (id:chararray, carrier_name:chararray, src_airport:chararray, dest_airport:chararray, tail_number:int);
+STORE A INTO 'accumulo://flights?instance=accumulo&amp;user=root&amp;password=passwd&amp;zookeepers=localhost' USING 
+    org.apache.pig.backend.hadoop.accumulo.AccumuloStorage('carrier_name,src_airport,dest_airport,tail_number');
+</source>
+            <p>Here, we read the file 'flights.txt' out of HDFS and store the results into the relation A.
+                We extract a unique ID for the flight, its source and destination and the tail number from the
+                given file. When STORE'ing back into Accumulo, we specify the column specifications (in this case,
+                just a column family). It is also important to note that four elements are provided as columns
+                because the first element in the Tuple is used as the row in Accumulo.
+            </p>
+        </section>
+   </section>
 </section>
 
+
 <!-- ======================================================== -->  
 <!-- ======================================================== -->  
 <!-- Math Functions -->

Modified: pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml Mon Feb 24 21:41:38 2014
@@ -1047,6 +1047,34 @@ java -cp $PIG_HOME/pig.jar 
 </ul>
 <p></p>
 </section>
+
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="direct-fetch">
+<title>Direct Fetch</title>
+<p>When the <a href="test.html#dump">DUMP</a> operator is used to execute Pig Latin statements, Pig can take the advantage to minimize latency by directly reading data from HDFS rather than launching MapReduce jobs.</p>
+
+<p>
+The result is fetched if the query contains any of the following operators: 
+<a href="basic.html#filter">FILTER</a>, 
+<a href="basic.html#foreach">FOREACH</a>, 
+<a href="basic.html#limit">LIMIT</a>, 
+<a href="basic.html#stream">STREAM</a>, 
+<a href="basic.html#union">UNION</a>.
+<br></br>
+Fetching will be disabled in case of:
+</p>
+<ul>
+  <li>the presence of other operators, <a href="http://pig.apache.org/docs/r0.13.0/api/org/apache/pig/impl/builtin/SampleLoader.html">sample loaders</a> and scalar expressions</li>
+  <li>implicit splits</li>
+</ul>
+
+<p>
+You can check if the query can be fetched by running EXPLAIN. You should see "No MR jobs. Fetch only." in the MapReduce part of the plan.
+<br></br>
+Direct fetch is turned on by default. To turn it off set the property opt.fetch to false or start Pig with the "-N" or "-no_fetch" option.
+</p>
+
+</section>
 </section>
   
 <!-- ==================================================================== -->
@@ -1113,9 +1141,9 @@ associated with a given key is too large
 <title>Usage</title>
 <p>Perform a skewed join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
 <source>
-big = LOAD 'big_data' AS (b1,b2,b3);
-massive = LOAD 'massive_data' AS (m1,m2,m3);
-C = JOIN big BY b1, massive BY m1 USING 'skewed';
+A = LOAD 'skewed_data' AS (a1,a2,a3);
+B = LOAD 'data' AS (b1,b2,b3);
+C = JOIN A BY a1, B BY b1 USING 'skewed';
 </source>
 </section>
 
@@ -1125,8 +1153,9 @@ C = JOIN big BY b1, massive BY m1 USING 
 Skewed join will only work under these conditions: 
 </p>
 <ul>
-<li>Skewed join works with two-table inner join. Currently we do not support more than two tables for skewed join. 
+<li>Skewed join works with two-table inner and outer join. Currently we do not support more than two tables for skewed join. 
 Specifying three-way (or more) joins will fail validation. For such joins, we rely on you to break them up into two-way joins.</li>
+<li>The skewed table must be specified as the left table. Pig samples on that table and determines the number of reducers per key.</li> 
 <li>The pig.skewedjoin.reduce.memusage Java parameter specifies the fraction of heap available for the 
 reducer to perform the join. A low fraction forces Pig to use more reducers but increases 
 copying cost. We have seen good performance when we set this value 

Modified: pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java (original)
+++ pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java Mon Feb 24 21:41:38 2014
@@ -40,7 +40,7 @@ public class ExecTypeProvider {
         for (ExecType execType : frameworkLoader) {
             log.info("Trying ExecType : " + execType);
             if (execType.accepts(properties)) {
-                log.debug("Picked " + execType + " as the ExecType");
+                log.info("Picked " + execType + " as the ExecType");
                 return getSingleton(execType);
             } else {
                 log.debug("Cannot pick " + execType + " as the ExecType");

Modified: pig/branches/tez/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/LoadFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/LoadFunc.java Mon Feb 24 21:41:38 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.Tuple;
@@ -302,8 +303,6 @@ public abstract class LoadFunc {
      * @param warningEnum type of warning
      */
     public final void warn(String msg, Enum warningEnum) {
-        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
-        if (counter!=null)
-            counter.increment(1);
+        PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/Main.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/Main.java (original)
+++ pig/branches/tez/src/org/apache/pig/Main.java Mon Feb 24 21:41:38 2014
@@ -208,6 +208,7 @@ public class Main {
             opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
             opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
             opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+            opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED);
             opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED);
 
             ExecMode mode = ExecMode.UNKNOWN;
@@ -300,6 +301,10 @@ public class Main {
                     properties.setProperty(PigConfiguration.OPT_MULTIQUERY,""+false);
                     break;
 
+                case 'N':
+                    properties.setProperty(PigConfiguration.OPT_FETCH,""+false);
+                    break;
+
                 case 'p':
                     params.add(opts.getValStr());
                     break;
@@ -863,6 +868,7 @@ public class Main {
             System.out.println("    -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
             System.out.println("    -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
             System.out.println("    -M, -no_multiquery - Turn multiquery optimization off; default is on");
+            System.out.println("    -N, -no_fetch - Turn fetch optimization off; default is on");
             System.out.println("    -P, -propertyFile - Path to property file");
             System.out.println("    -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including");
             System.out.println("                     any environment variables that are set by the pig command.");
@@ -885,6 +891,8 @@ public class Main {
             System.out.println("            Only disable combiner as a temporary workaround for problems.");
             System.out.println("        opt.multiquery=true|false; multiquery is on by default.");
             System.out.println("            Only disable multiquery as a temporary workaround for problems.");
+            System.out.println("        opt.fetch=true|false; fetch is on by default.");
+            System.out.println("            Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.");
             System.out.println("        pig.tmpfilecompression=true|false; compression is off by default.");
             System.out.println("            Determines whether output of intermediate jobs is compressed.");
             System.out.println("        pig.tmpfilecompression.codec=lzo|gzip; default is gzip.");

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Mon Feb 24 21:41:38 2014
@@ -125,7 +125,7 @@ public class PigConfiguration {
      */
     public static final String MAX_SCRIPT_SIZE = "pig.script.max.size";
 
-   /**
+    /**
      * This key is used to define whether to have intermediate file compressed
      */
     public static final String PIG_ENABLE_TEMP_FILE_COMPRESSION = "pig.tmpfilecompression";
@@ -160,9 +160,9 @@ public class PigConfiguration {
     public static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
 
     /**
-      * This key used to control the maximum size loaded into
-      * the distributed cache when doing fragment-replicated join
-      */
+     * This key used to control the maximum size loaded into
+     * the distributed cache when doing fragment-replicated join
+     */
     public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
 
     /**
@@ -186,5 +186,49 @@ public class PigConfiguration {
      * of nested distinct or sort
      */
     public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
+ 
+    /**
+     * This key used to control the sample size of RandomeSampleLoader for
+     * order-by. The default value is 100 rows per task.
+     */
+    public static final String PIG_RANDOM_SAMPLER_SAMPLE_SIZE = "pig.random.sampler.sample.size";
+
+    /**
+     * This key is to turn on auto local mode feature
+     */
+    public static final String PIG_AUTO_LOCAL_ENABLED = "pig.auto.local.enabled";
+
+    /**
+     * Controls the max threshold size to convert jobs to run in local mode
+     */
+    public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+
+    /**
+     * This parameter enables/disables fetching. By default it is turned on.
+     */
+    public static final String OPT_FETCH = "opt.fetch";
+
+    /**
+     * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
+     * so that jobs won't write empty part files if no output is generated
+     */
+    public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
+
+    /**
+     * Location where pig stores temporary files for job setup
+     */
+    public static final String PIG_TEMP_DIR = "pig.temp.dir";
+
+    /**
+     * This key is turn on the user level cache
+     */
+    public static final String PIG_USER_CACHE_ENABLED = "pig.user.cache.enabled";
+
+    /**
+     * Location where additional jars are cached for the user
+     * Additional jar will be cached under PIG_USER_CACHE_LOCATION/${user.name}/.pigcache
+     * and will be re-used across the jobs run by the user if the jar has not changed
+     */
+    public static final String PIG_USER_CACHE_LOCATION = "pig.user.cache.location";
 }
 

Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Mon Feb 24 21:41:38 2014
@@ -104,6 +104,7 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -241,6 +242,7 @@ public class PigServer {
         }
 
         addJarsFromProperties();
+        markPredeployedJarsFromProperties();
 
         if (PigStats.get() == null) {
             PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
@@ -277,6 +279,22 @@ public class PigServer {
         }
     }
 
+    private void markPredeployedJarsFromProperties() throws ExecException {
+        // mark jars as predeployed from properties
+        String jar_str = pigContext.getProperties().getProperty("pig.predeployed.jars");
+		
+        if(jar_str != null){
+            // Use File.pathSeparator (":" on Linux, ";" on Windows)
+            // to correctly handle path aggregates as they are represented
+            // on the Operating System.
+            for(String jar : jar_str.split(File.pathSeparator)){
+                if (jar.length() > 0) {
+                    pigContext.markJarAsPredeployed(jar);
+                }
+            }
+        }
+    }
+
     public PigContext getPigContext(){
         return pigContext;
     }
@@ -413,6 +431,12 @@ public class PigServer {
      */
     protected List<ExecJob> getJobs(PigStats stats) {
         LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
+        if (stats instanceof SimpleFetchPigStats) {
+            HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null)
+                    .getPOStore(), null);
+            jobs.add(job);
+            return jobs;
+        }
         JobGraph jGraph = stats.getJobGraph();
         Iterator<JobStats> iter = jGraph.iterator();
         while (iter.hasNext()) {
@@ -1719,6 +1743,9 @@ public class PigServer {
             CompilationMessageCollector collector = new CompilationMessageCollector() ;
 
             new TypeCheckingRelVisitor( lp, collector).visit();
+            new UnionOnSchemaSetter( lp ).visit();
+            new CastLineageSetter(lp, collector).visit();
+            new ScalarVariableValidator(lp).visit();
             if(aggregateWarning) {
                 CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
             } else {
@@ -1727,9 +1754,6 @@ public class PigServer {
                 }
             }
 
-            new UnionOnSchemaSetter( lp ).visit();
-            new CastLineageSetter(lp, collector).visit();
-            new ScalarVariableValidator(lp).visit();
         }
 
         private void postProcess() throws IOException {

Modified: pig/branches/tez/src/org/apache/pig/PigWarning.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigWarning.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigWarning.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigWarning.java Mon Feb 24 21:41:38 2014
@@ -66,6 +66,7 @@ public enum PigWarning {
     REDUCER_COUNT_LOW,
     NULL_COUNTER_COUNT,
     DELETE_FAILED,
-    PROJECTION_INVALID_RANGE
+    PROJECTION_INVALID_RANGE,
+    NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY
     ;
 }

Modified: pig/branches/tez/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/StoreFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/StoreFunc.java Mon Feb 24 21:41:38 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
@@ -202,7 +203,6 @@ public abstract class StoreFunc implemen
      * @param warningEnum type of warning
      */
     public final void warn(String msg, Enum warningEnum) {
-        Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
-        counter.increment(1);
+        PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Mon Feb 24 21:41:38 2014
@@ -18,16 +18,16 @@
 
 package org.apache.pig.backend.hadoop.datastorage;
 
-import java.lang.reflect.Method;
 import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 
 public class ConfigurationUtil {
@@ -83,21 +83,10 @@ public class ConfigurationUtil {
             // so build/classes/hadoop-site.xml contains such entry. This prevents some tests from
             // successful (They expect those files in hdfs), so we need to unset it in hadoop 23.
             // This should go away once MiniMRCluster fix the distributed cache issue.
-            Method unsetMethod = null;
-            try {
-                unsetMethod = localConf.getClass().getMethod("unset", new Class[]{String.class});
-            } catch (Exception e) {
-            }
-            if (unsetMethod!=null) {
-                try {
-                    unsetMethod.invoke(localConf, new Object[]{"mapreduce.job.cache.files"});
-                } catch (Exception e) {
-                    // Should not happen
-                }
-            }
+            HadoopShims.unsetConf(localConf, "mapreduce.job.cache.files");
         }
+        localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         Properties props = ConfigurationUtil.toProperties(localConf);
-        props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
         return props;
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Mon Feb 24 21:41:38 2014
@@ -1,204 +1,208 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.backend.hadoop.datastorage;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.*;
-import org.apache.pig.backend.executionengine.ExecException;
-
-public abstract class HPath implements ElementDescriptor {
-
-    protected Path path;
-    protected HDataStorage fs;
-
-    public HPath(HDataStorage fs, Path parent, Path child) {
-        this.path = new Path(parent, child);
-        this.fs = fs;
-    }
-
-    public HPath(HDataStorage fs, String parent, String child) {
-        this(fs, new Path(parent), new Path(child));
-    }
-    
-    public HPath(HDataStorage fs, Path parent, String child) {
-        this(fs, parent, new Path(child));
-    }
-
-    public HPath(HDataStorage fs, String parent, Path child) {
-        this(fs, new Path(parent), child);
-    }
-        
-    public HPath(HDataStorage fs, String pathString) {
-        this(fs, new Path(pathString));
-    }
-        
-    public HPath(HDataStorage fs, Path path) {
-        this.path = path;
-        this.fs = fs;
-    }
-
-    public DataStorage getDataStorage() {
-        return fs;
-    }
-    
-    public abstract OutputStream create(Properties configuration) 
-             throws IOException;
-    
-    public void copy(ElementDescriptor dstName,
-                     Properties dstConfiguration,
-                     boolean removeSrc)
-            throws IOException {
-        FileSystem srcFS = this.fs.getHFS();
-        FileSystem dstFS = ((HPath)dstName).fs.getHFS();
-        
-        Path srcPath = this.path;
-        Path dstPath = ((HPath)dstName).path;
-        
-        boolean result = FileUtil.copy(srcFS,
-                                       srcPath,
-                                       dstFS,
-                                       dstPath,
-                                       false,
-                                       new Configuration());
-        
-        if (!result) {
-            int errCode = 2097;
-            String msg = "Failed to copy from: " + this.toString() +
-            " to: " + dstName.toString();
-            throw new ExecException(msg, errCode, PigException.BUG);
-        }
-    }
-    
-    public abstract InputStream open() throws IOException;
-
-    public abstract SeekableInputStream sopen() throws IOException;
-
-    public boolean exists() throws IOException {
-        return fs.getHFS().exists(path);
-    }
-    
-    public void rename(ElementDescriptor newName) 
-             throws IOException {
-        if (newName != null) {
-            fs.getHFS().rename(path, ((HPath)newName).path);
-        }
-    }
-
-    public void delete() throws IOException {
-        // the file is removed and not placed in the trash bin
-        fs.getHFS().delete(path, true);
-    }
-
-    public Properties getConfiguration() throws IOException {
-        HConfiguration props = new HConfiguration();
-
-        long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
-
-        short replication = fs.getHFS().getFileStatus(path).getReplication();
-        
-        props.setProperty(BLOCK_SIZE_KEY, (Long.valueOf(blockSize)).toString());
-        props.setProperty(BLOCK_REPLICATION_KEY, (Short.valueOf(replication)).toString());
-        
-        return props;
-    }
-
-    public void updateConfiguration(Properties newConfig) throws IOException {
-        if (newConfig == null) {
-            return;
-        }
-        
-        String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
-        
-        fs.getHFS().setReplication(path, 
-                                   new Short(blkReplStr).shortValue());    
-    }
-
-    public Map<String, Object> getStatistics() throws IOException {
-        HashMap<String, Object> props = new HashMap<String, Object>();
-        
-        FileStatus fileStatus = fs.getHFS().getFileStatus(path);
-
-        props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
-        props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
-        props.put(LENGTH_KEY, fileStatus.getLen());
-        props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
-        
-        return props;
-    }
-
-    public OutputStream create() throws IOException {
-        return create(null);
-    }
-
-    public void copy(ElementDescriptor dstName,
-                     boolean removeSrc) 
-            throws IOException {
-        copy(dstName, null, removeSrc);
-    }
-    
-    public Path getPath() {
-        return path;
-    }
-    
-    public FileSystem getHFS() {
-        return fs.getHFS();
-    }
-
-    public boolean systemElement() {
-        return (path != null && 
-                (path.getName().startsWith("_") ||
-                 path.getName().startsWith(".")));
-    }
-   
-    @Override
-    public String toString() {
-        return path.makeQualified(getHFS()).toString();
-    }
-    
-    @Override
-    public boolean equals(Object obj) {
-        if (! (obj instanceof HPath)) {
-            return false;
-        }
-        
-        return this.path.equals(((HPath)obj).path);  
-    }
-    
-    public int compareTo(ElementDescriptor other) {
-        return path.compareTo(((HPath)other).path);
-    }
-    
-    @Override
-    public int hashCode() {
-        return this.path.hashCode();
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.datastorage.*;
+import org.apache.pig.backend.executionengine.ExecException;
+
+public abstract class HPath implements ElementDescriptor {
+
+    protected Path path;
+    protected HDataStorage fs;
+
+    public HPath(HDataStorage fs, Path parent, Path child) {
+        this.path = new Path(parent, child);
+        this.fs = fs;
+    }
+
+    public HPath(HDataStorage fs, String parent, String child) {
+        this(fs, new Path(parent), new Path(child));
+    }
+
+    public HPath(HDataStorage fs, Path parent, String child) {
+        this(fs, parent, new Path(child));
+    }
+
+    public HPath(HDataStorage fs, String parent, Path child) {
+        this(fs, new Path(parent), child);
+    }
+
+    public HPath(HDataStorage fs, String pathString) {
+        this(fs, new Path(pathString));
+    }
+
+    public HPath(HDataStorage fs, Path path) {
+        this.path = path;
+        this.fs = fs;
+    }
+
+    public DataStorage getDataStorage() {
+        return fs;
+    }
+
+    public abstract OutputStream create(Properties configuration)
+             throws IOException;
+
+    public void copy(ElementDescriptor dstName,
+                     Properties dstConfiguration,
+                     boolean removeSrc)
+            throws IOException {
+        FileSystem srcFS = this.fs.getHFS();
+        FileSystem dstFS = ((HPath)dstName).fs.getHFS();
+
+        Path srcPath = this.path;
+        Path dstPath = ((HPath)dstName).path;
+
+        boolean result = FileUtil.copy(srcFS,
+                                       srcPath,
+                                       dstFS,
+                                       dstPath,
+                                       false,
+                                       new Configuration());
+
+        if (!result) {
+            int errCode = 2097;
+            String msg = "Failed to copy from: " + this.toString() +
+            " to: " + dstName.toString();
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    public abstract InputStream open() throws IOException;
+
+    public abstract SeekableInputStream sopen() throws IOException;
+
+    public boolean exists() throws IOException {
+        return fs.getHFS().exists(path);
+    }
+
+    public void rename(ElementDescriptor newName)
+             throws IOException {
+        if (newName != null) {
+            fs.getHFS().rename(path, ((HPath)newName).path);
+        }
+    }
+
+    public void delete() throws IOException {
+        // the file is removed and not placed in the trash bin
+        fs.getHFS().delete(path, true);
+    }
+
+    public void setPermission(FsPermission permission) throws IOException {
+        fs.getHFS().setPermission(path, permission);
+    }
+
+    public Properties getConfiguration() throws IOException {
+        HConfiguration props = new HConfiguration();
+
+        long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+        short replication = fs.getHFS().getFileStatus(path).getReplication();
+
+        props.setProperty(BLOCK_SIZE_KEY, (Long.valueOf(blockSize)).toString());
+        props.setProperty(BLOCK_REPLICATION_KEY, (Short.valueOf(replication)).toString());
+
+        return props;
+    }
+
+    public void updateConfiguration(Properties newConfig) throws IOException {
+        if (newConfig == null) {
+            return;
+        }
+
+        String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+
+        fs.getHFS().setReplication(path,
+                                   new Short(blkReplStr).shortValue());
+    }
+
+    public Map<String, Object> getStatistics() throws IOException {
+        HashMap<String, Object> props = new HashMap<String, Object>();
+
+        FileStatus fileStatus = fs.getHFS().getFileStatus(path);
+
+        props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
+        props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
+        props.put(LENGTH_KEY, fileStatus.getLen());
+        props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
+
+        return props;
+    }
+
+    public OutputStream create() throws IOException {
+        return create(null);
+    }
+
+    public void copy(ElementDescriptor dstName,
+                     boolean removeSrc)
+            throws IOException {
+        copy(dstName, null, removeSrc);
+    }
+
+    public Path getPath() {
+        return path;
+    }
+
+    public FileSystem getHFS() {
+        return fs.getHFS();
+    }
+
+    public boolean systemElement() {
+        return (path != null &&
+                (path.getName().startsWith("_") ||
+                 path.getName().startsWith(".")));
+    }
+
+    @Override
+    public String toString() {
+        return path.makeQualified(getHFS()).toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (! (obj instanceof HPath)) {
+            return false;
+        }
+
+        return this.path.equals(((HPath)obj).path);
+    }
+
+    public int compareTo(ElementDescriptor other) {
+        return path.compareTo(((HPath)other).path);
+    }
+
+    @Override
+    public int hashCode() {
+        return this.path.hashCode();
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Feb 24 21:41:38 2014
@@ -40,7 +40,8 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.executionengine.ExecutionEngine;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -76,17 +77,21 @@ public abstract class HExecutionEngine i
 
     private static final Log LOG = LogFactory.getLog(HExecutionEngine.class);
 
-    public static final String LOCAL = "local";
-    public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
-    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
-    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
     public static final String HADOOP_SITE = "hadoop-site.xml";
     public static final String CORE_SITE = "core-site.xml";
     public static final String YARN_SITE = "yarn-site.xml";
+    public static final String CORE_DEFAULT_SITE = "core-default.xml";
+    public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
+    public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
+
+    public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+    public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+    public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
+    public static final String MAPREDUCE_FRAMEWORK_NAME = "mapreduce.framework.name";
+    public static final String LOCAL = "local";
 
     protected PigContext pigContext;
     protected DataStorage ds;
-    protected JobConf jobConf;
     protected Launcher launcher;
 
     // key: the operator key from the logical plan that originated the physical plan
@@ -98,12 +103,14 @@ public abstract class HExecutionEngine i
     public HExecutionEngine(PigContext pigContext) {
         this.pigContext = pigContext;
         this.ds = null;
-        this.jobConf = null;
         this.logicalToPhysicalKeys = Maps.newHashMap();
     }
 
+    @Deprecated
     public JobConf getJobConf() {
-        return this.jobConf;
+        JobConf jc = new JobConf(false);
+        Utils.recomputeProperties(jc, pigContext.getProperties());
+        return jc;
     }
 
     public DataStorage getDataStorage() {
@@ -114,7 +121,52 @@ public abstract class HExecutionEngine i
         init(this.pigContext.getProperties());
     }
 
-    @SuppressWarnings("resource")
+    public JobConf getLocalConf(Properties properties) {
+        JobConf jc = new JobConf(false);
+
+        // If we are running in local mode we dont read the hadoop conf file
+        if (properties.getProperty(MAPREDUCE_FRAMEWORK_NAME) == null) {
+            jc.set(MAPREDUCE_FRAMEWORK_NAME, LOCAL);
+        }
+        jc.set(JOB_TRACKER_LOCATION, LOCAL);
+        jc.set(FILE_SYSTEM_LOCATION, "file:///");
+        jc.set(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
+
+        jc.addResource(MAPRED_DEFAULT_SITE);
+        jc.addResource(YARN_DEFAULT_SITE);
+        jc.addResource(CORE_DEFAULT_SITE);
+
+        return jc;
+    }
+
+    public JobConf getExecConf(Properties properties) throws ExecException {
+        JobConf jc = null;
+        // Check existence of user provided configs
+        String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
+        if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
+            jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
+        } else {
+            // Check existence of hadoop-site.xml or core-site.xml in
+            // classpath if user provided confs are not being used
+            Configuration testConf = new Configuration();
+            ClassLoader cl = testConf.getClassLoader();
+            URL hadoop_site = cl.getResource(HADOOP_SITE);
+            URL core_site = cl.getResource(CORE_SITE);
+
+            if (hadoop_site == null && core_site == null) {
+                throw new ExecException(
+                        "Cannot find hadoop configurations in classpath "
+                                + "(neither hadoop-site.xml nor core-site.xml was found in the classpath)."
+                                + " If you plan to use local mode, please put -x local option in command line",
+                                4010);
+            }
+            jc = new JobConf();
+        }
+        jc.addResource("pig-cluster-hadoop-site.xml");
+        jc.addResource(YARN_SITE);
+        return jc;
+    }
+
     private void init(Properties properties) throws ExecException {
         String cluster = null;
         String nameNode = null;
@@ -138,54 +190,20 @@ public abstract class HExecutionEngine i
 
         JobConf jc = null;
         if (!this.pigContext.getExecType().isLocal()) {
-            // Check existence of user provided configs
-            String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
-            if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
-                jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
-            } else {
-                // Check existence of hadoop-site.xml or core-site.xml in
-                // classpath if user provided confs are not being used
-                Configuration testConf = new Configuration();
-                ClassLoader cl = testConf.getClassLoader();
-                URL hadoop_site = cl.getResource(HADOOP_SITE);
-                URL core_site = cl.getResource(CORE_SITE);
-
-                if (hadoop_site == null && core_site == null) {
-                    throw new ExecException(
-                            "Cannot find hadoop configurations in classpath "
-                                    + "(neither hadoop-site.xml nor core-site.xml was found in the classpath)."
-                                    + " If you plan to use local mode, please put -x local option in command line",
-                            4010);
-                }
-                jc = new JobConf();
-            }
-            jc.addResource("pig-cluster-hadoop-site.xml");
-            jc.addResource(YARN_SITE);
+            jc = getExecConf(properties);
 
             // Trick to invoke static initializer of DistributedFileSystem to
             // add hdfs-default.xml into configuration
             new DistributedFileSystem();
-
-            // the method below alters the properties object by overriding the
-            // hadoop properties with the values from properties and recomputing
-            // the properties
-            Utils.recomputeProperties(jc, properties);
         } else {
-            // If we are running in local mode we dont read the hadoop conf file
-            if (properties.getProperty("mapreduce.framework.name") == null) {
-                properties.setProperty("mapreduce.framework.name", "local");
-            }
-            properties.setProperty(JOB_TRACKER_LOCATION, LOCAL);
-            properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
-            properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
-
-            jc = new JobConf(false);
-            jc.addResource("core-default.xml");
-            jc.addResource("mapred-default.xml");
-            jc.addResource("yarn-default.xml");
-            Utils.recomputeProperties(jc, properties);
+            jc = getLocalConf(properties);
         }
 
+        // the method below alters the properties object by overriding the
+        // hadoop properties with the values from properties and recomputing
+        // the properties
+        Utils.recomputeProperties(jc, properties);
+
         cluster = jc.get(JOB_TRACKER_LOCATION);
         nameNode = jc.get(FILE_SYSTEM_LOCATION);
         if (nameNode == null) {
@@ -215,9 +233,6 @@ public abstract class HExecutionEngine i
             LOG.info("Connecting to map-reduce job tracker at: "
                     + jc.get(JOB_TRACKER_LOCATION));
         }
-
-        // Set job-specific configuration knobs
-        jobConf = jc;
     }
 
     @SuppressWarnings("unchecked")
@@ -341,6 +356,13 @@ public abstract class HExecutionEngine i
 
         try {
             PhysicalPlan pp = compile(lp, pc.getProperties());
+            //if the compiled physical plan fulfills the requirements of the
+            //fetch optimizer, then further transformations / MR jobs creations are
+            //skipped; a SimpleFetchPigStats will be returned through which the result
+            //can be directly fetched from the underlying storage
+            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+                return new FetchLauncher(pc).launchPig(pp);
+            }
             return launcher.launchPig(pp, grpName, pigContext);
         } catch (ExecException e) {
             throw (ExecException) e;
@@ -355,8 +377,8 @@ public abstract class HExecutionEngine i
 
     public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
             String format, boolean verbose, File file, String suffix)
-            throws PlanException, VisitorException, IOException,
-            FrontendException {
+                    throws PlanException, VisitorException, IOException,
+                    FrontendException {
 
         PrintStream pps = ps;
         PrintStream eps = ps;
@@ -371,6 +393,10 @@ public abstract class HExecutionEngine i
             pp.explain(pps, format, verbose);
 
             MapRedUtil.checkLeafIsStore(pp, pigContext);
+            if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+                new FetchLauncher(pigContext).explain(pp, pc, eps, format);
+                return;
+            }
             launcher.explain(pp, pigContext, eps, format, verbose);
         } finally {
             launcher.reset();
@@ -383,10 +409,9 @@ public abstract class HExecutionEngine i
     }
 
     public Properties getConfiguration() {
-        if (jobConf == null) {
-            return null;
-        }
-        return ConfigurationUtil.toProperties(jobConf);
+        Properties properties = new Properties();
+        properties.putAll(pigContext.getProperties());
+        return properties;
     }
 
     public void setConfiguration(Properties newConfiguration) throws ExecException {
@@ -396,7 +421,6 @@ public abstract class HExecutionEngine i
     public void setProperty(String property, String value) {
         Properties properties = pigContext.getProperties();
         properties.put(property, value);
-        Utils.recomputeProperties(jobConf, properties);
     }
 
     public ExecutableManager getExecutableManager() {
@@ -405,7 +429,7 @@ public abstract class HExecutionEngine i
 
     public void killJob(String jobID) throws BackendException {
         if (launcher != null) {
-            launcher.killJob(jobID, jobConf);
+            launcher.killJob(jobID, getJobConf());
         }
     }
 

Copied: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (from r1571421, pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java)
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?p2=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java&p1=pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java&r1=1571421&r2=1571454&rev=1571454&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Mon Feb 24 21:41:38 2014
@@ -32,19 +32,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -231,16 +228,6 @@ public class FetchOptimizer {
         }
 
         @Override
-        public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
-        public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
         public void visitSplit(POSplit spl) throws VisitorException {
             planFetchable = false;
         }
@@ -271,11 +258,6 @@ public class FetchOptimizer {
         }
 
         @Override
-        public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
         public void visitCross(POCross cross) throws VisitorException {
             planFetchable = false;
         }