You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC
svn commit: r1571454 [2/5] - in /pig/branches/tez: ./ conf/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/
contrib/piggybank/java/src/main/java/...
Modified: pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/branches/tez/src/docs/src/documentation/content/xdocs/func.xml Mon Feb 24 21:41:38 2014
@@ -2152,8 +2152,165 @@ STORE measurements INTO 'measurements' U
AvroStorage. See <a href="#AvroStorage">AvroStorage</a> for a detailed description of the
arguments for TrevniStorage.</p>
</section>
+
+ <!-- ++++++++++++++++++++++++++++++++++++++++++++++ -->
+ <section id="AccumuloStorage">
+ <title>AccumuloStorage</title>
+ <p>Loads or stores data from an Accumulo table. The first element in a Tuple is equivalent to the "row"
+ from the Accumulo Key, while the columns in that row are can be grouped in various static or wildcarded
+ ways. Basic wildcarding functionality exists to group various columns families/qualifiers into a Map for
+ LOADs, or serialize a Map into some group of column families or qualifiers on STOREs.
+ </p>
+
+ <section>
+ <title>Syntax</title>
+ <table>
+ <tr>
+ <td>
+ <p>AccumuloStorage(['columns'[, 'options']])</p>
+ </td>
+ </tr>
+ </table>
+ </section>
+
+ <section>
+ <title>Arguments</title>
+ <table>
+ <tr>
+ <td>
+ <p>'columns'</p>
+ </td>
+ <td>
+ <p>A comma-separated list of "columns" to read data from to write data to.
+ Each of these columns can be considered one of three different types:
+ </p>
+
+ <ol>
+ <li>Literal</li>
+ <li>Column family prefix</li>
+ <li>Column qualifier prefix</li>
+ </ol>
+
+ <p><strong>Literal:</strong> this is the simplest specification
+ which is a colon-delimited string that maps to a column family and column
+ qualifier. This will read/write a simple scalar from/to Accumulo.
+ </p>
+
+ <p><strong>Column family prefix:</strong> When reading data, this
+ will fetch data from Accumulo Key-Values in the current row whose column family match the
+ given prefix. This will result in a Map being placed into the Tuple. When writing
+ data, a Map is also expected at the given offset in the Tuple whose Keys will be
+ appended to the column family prefix, an empty column qualifier is used, and the Map
+ value will be placed in the Accumulo Value. A valid column family prefix is a literal
+ asterisk (*) in which case the Map Key will be equivalent to the Accumulo column family.
+ </p>
+
+ <p><strong>Column qualifier prefix:</strong> Similar to the column
+ family prefix except it operates on the column qualifier. On reads, Accumulo Key-Values
+ in the same row that match the given column family and column qualifier prefix will be
+ placed into a single Map. On writes, the provided column family from the column specification
+ will be used, the Map key will be appended to the column qualifier provided in the specification,
+ and the Map Value will be the Accumulo Value.
+ </p>
+
+ <p>When "columns" is not provided or is a blank String, it is treated equivalently to "*".
+ This is to say that when a column specification string is not provided, for reads, all columns
+ in the given Accumulo row will be placed into a single Map (with the Map keys being colon
+ delimited to preserve the column family/qualifier from Accumulo). For writes, the Map keys
+ will be placed into the column family and the column qualifier will be empty.
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td>
+ <p>'options'</p>
+ </td>
+ <td>
+ <p>A string that contains space-separated options ("optionA valueA -optionB valueB -optionC valueC")</p>
+ <p>The currently supported options are:</p>
+ <ul>
+ <li>(-c|--caster) LoadStoreCasterImpl An implementation of a LoadStoreCaster to use when serializing types into Accumulo,
+ usually AccumuloBinaryConverter or UTF8StringConverter, defaults to UTF8StorageConverter.
+ </li>
+ <li>(-auths|--authorizations) auth1,auth2... A comma-separated list of Accumulo authorizations to use when reading
+ data from Accumulo. Defaults to the empty set of authorizations (none).
+ </li>
+ <li>(-s|--start) start_row The Accumulo row to begin reading from, inclusive</li>
+ <li>(-e|--end) end_row The Accumulo row to read until, inclusive</li>
+ <li>(-buff|--mutation-buffer-size) num_bytes The number of bytes to buffer when writing data to Accumulo. A higher
+ value requires more memory</li>
+ <li>(-wt|--write-threads) num_threads The number of threads used to write data to Accumulo.</li>
+ <li>(-ml|--max-latency) milliseconds Maximum time in milliseconds before data is flushed to Accumulo.</li>
+ <li>(-sep|--separator) str The separator character used when parsing the column specification, defaults to comma (,)</li>
+ <li>(-iw|--ignore-whitespace) (true|false) Should whitespace be stripped from the column specification, defaults to true</li>
+ </ul>
+ </td>
+ </tr>
+ </table>
+ </section>
+
+ <section>
+ <title>Usage</title>
+
+ <p>AccumuloStorage has the functionality to store or fetch data from Accumulo. Its goal is to provide
+ a simple, widely applicable table schema compatible with Pig's API. Each Tuple contains some subset
+ of the columns stored within one row of the Accumulo table, which depends on the columns provided
+ as an argument to the function. If '*' is provided, all columns in the table will be returned. The
+ second argument provides control over a variety of options that can be used to change various properties.</p>
+ <p>When invoking Pig Scripts that use AccumuloStorage, it's important to ensure that Pig has the Accumulo
+ jars on its classpath. This is easily achieved using the ACCUMULO_HOME environment variable.
+ </p>
+<source>
+PIG_CLASSPATH="$ACCUMULO_HOME/lib/*:$PIG_CLASSPATH" pig my_script.pig
+</source>
+ </section>
+
+ <section>
+ <title>Load Example</title>
+ <p>It is simple to fetch all columns from Airport codes that fall between Boston and San Francisco
+ that can be viewed with 'auth1' and/or 'auth2' Accumulo authorizations.</p>
+<source>
+raw = LOAD 'accumulo://airports?instance=accumulo&user=root&password=passwd&zookeepers=localhost'
+ USING org.apache.pig.backend.hadoop.accumulo.AccumuloStorage(
+ '*', '-a auth1,auth2 -s BOS -e SFO') AS
+ (code:chararray, all_columns:map[]);
+</source>
+ <p>The datatypes of the columns are declared with the "AS" clause. In this example, the row key,
+ which is the unique airport code is assigned to the "code" variable while all of the other
+ columns are placed into the map. When there is a non-empty column qualifier, the key in that
+ map will have a colon which separates which portion of the key came from the column family and
+ which portion came from the column qualifier. The Accumulo value is placed in the Map value.</p>
+
+ <p>Most times, it is not necessary, nor desired for performance reasons, to fetch all columns.</p>
+<source>
+raw = LOAD 'accumulo://airports?instance=accumulo&user=root&password=passwd&zookeepers=localhost'
+ USING org.apache.pig.backend.hadoop.accumulo.AccumuloStorage(
+ 'name,building:num_terminals,carrier*,reviews:transportation*') AS
+ (code:chararray name:bytearray carrier_map:map[] transportion_reviews_map:map[]);
+</source>
+ <p>An asterisk can be used when requesting columns to group a collection of columns into a single
+ Map instead of enumerating each column.</p>
+ </section>
+
+ <section>
+ <title>Store Example</title>
+ <p>Data can be easily stored into Accumulo.</p>
+<source>
+A = LOAD 'flights.txt' AS (id:chararray, carrier_name:chararray, src_airport:chararray, dest_airport:chararray, tail_number:int);
+STORE A INTO 'accumulo://flights?instance=accumulo&user=root&password=passwd&zookeepers=localhost' USING
+ org.apache.pig.backend.hadoop.accumulo.AccumuloStorage('carrier_name,src_airport,dest_airport,tail_number');
+</source>
+ <p>Here, we read the file 'flights.txt' out of HDFS and store the results into the relation A.
+ We extract a unique ID for the flight, its source and destination and the tail number from the
+ given file. When STORE'ing back into Accumulo, we specify the column specifications (in this case,
+ just a column family). It is also important to note that four elements are provided as columns
+ because the first element in the Tuple is used as the row in Accumulo.
+ </p>
+ </section>
+ </section>
</section>
+
<!-- ======================================================== -->
<!-- ======================================================== -->
<!-- Math Functions -->
Modified: pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml Mon Feb 24 21:41:38 2014
@@ -1047,6 +1047,34 @@ java -cp $PIG_HOME/pig.jar
</ul>
<p></p>
</section>
+
+<!-- +++++++++++++++++++++++++++++++ -->
+<section id="direct-fetch">
+<title>Direct Fetch</title>
+<p>When the <a href="test.html#dump">DUMP</a> operator is used to execute Pig Latin statements, Pig can take the advantage to minimize latency by directly reading data from HDFS rather than launching MapReduce jobs.</p>
+
+<p>
+The result is fetched if the query contains any of the following operators:
+<a href="basic.html#filter">FILTER</a>,
+<a href="basic.html#foreach">FOREACH</a>,
+<a href="basic.html#limit">LIMIT</a>,
+<a href="basic.html#stream">STREAM</a>,
+<a href="basic.html#union">UNION</a>.
+<br></br>
+Fetching will be disabled in case of:
+</p>
+<ul>
+ <li>the presence of other operators, <a href="http://pig.apache.org/docs/r0.13.0/api/org/apache/pig/impl/builtin/SampleLoader.html">sample loaders</a> and scalar expressions</li>
+ <li>implicit splits</li>
+</ul>
+
+<p>
+You can check if the query can be fetched by running EXPLAIN. You should see "No MR jobs. Fetch only." in the MapReduce part of the plan.
+<br></br>
+Direct fetch is turned on by default. To turn it off set the property opt.fetch to false or start Pig with the "-N" or "-no_fetch" option.
+</p>
+
+</section>
</section>
<!-- ==================================================================== -->
@@ -1113,9 +1141,9 @@ associated with a given key is too large
<title>Usage</title>
<p>Perform a skewed join with the USING clause (see <a href="basic.html#JOIN-inner">JOIN (inner)</a> and <a href="basic.html#JOIN-outer">JOIN (outer)</a>). </p>
<source>
-big = LOAD 'big_data' AS (b1,b2,b3);
-massive = LOAD 'massive_data' AS (m1,m2,m3);
-C = JOIN big BY b1, massive BY m1 USING 'skewed';
+A = LOAD 'skewed_data' AS (a1,a2,a3);
+B = LOAD 'data' AS (b1,b2,b3);
+C = JOIN A BY a1, B BY b1 USING 'skewed';
</source>
</section>
@@ -1125,8 +1153,9 @@ C = JOIN big BY b1, massive BY m1 USING
Skewed join will only work under these conditions:
</p>
<ul>
-<li>Skewed join works with two-table inner join. Currently we do not support more than two tables for skewed join.
+<li>Skewed join works with two-table inner and outer join. Currently we do not support more than two tables for skewed join.
Specifying three-way (or more) joins will fail validation. For such joins, we rely on you to break them up into two-way joins.</li>
+<li>The skewed table must be specified as the left table. Pig samples on that table and determines the number of reducers per key.</li>
<li>The pig.skewedjoin.reduce.memusage Java parameter specifies the fraction of heap available for the
reducer to perform the join. A low fraction forces Pig to use more reducers but increases
copying cost. We have seen good performance when we set this value
Modified: pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java (original)
+++ pig/branches/tez/src/org/apache/pig/ExecTypeProvider.java Mon Feb 24 21:41:38 2014
@@ -40,7 +40,7 @@ public class ExecTypeProvider {
for (ExecType execType : frameworkLoader) {
log.info("Trying ExecType : " + execType);
if (execType.accepts(properties)) {
- log.debug("Picked " + execType + " as the ExecType");
+ log.info("Picked " + execType + " as the ExecType");
return getSingleton(execType);
} else {
log.debug("Cannot pick " + execType + " as the ExecType");
Modified: pig/branches/tez/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/LoadFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/LoadFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/LoadFunc.java Mon Feb 24 21:41:38 2014
@@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.Tuple;
@@ -302,8 +303,6 @@ public abstract class LoadFunc {
* @param warningEnum type of warning
*/
public final void warn(String msg, Enum warningEnum) {
- Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
- if (counter!=null)
- counter.increment(1);
+ PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
}
}
Modified: pig/branches/tez/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/Main.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/Main.java (original)
+++ pig/branches/tez/src/org/apache/pig/Main.java Mon Feb 24 21:41:38 2014
@@ -208,6 +208,7 @@ public class Main {
opts.registerOpt('x', "exectype", CmdLineParser.ValueExpected.REQUIRED);
opts.registerOpt('F', "stop_on_failure", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('M', "no_multiquery", CmdLineParser.ValueExpected.NOT_ACCEPTED);
+ opts.registerOpt('N', "no_fetch", CmdLineParser.ValueExpected.NOT_ACCEPTED);
opts.registerOpt('P', "propertyFile", CmdLineParser.ValueExpected.REQUIRED);
ExecMode mode = ExecMode.UNKNOWN;
@@ -300,6 +301,10 @@ public class Main {
properties.setProperty(PigConfiguration.OPT_MULTIQUERY,""+false);
break;
+ case 'N':
+ properties.setProperty(PigConfiguration.OPT_FETCH,""+false);
+ break;
+
case 'p':
params.add(opts.getValStr());
break;
@@ -863,6 +868,7 @@ public class Main {
System.out.println(" -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.");
System.out.println(" -F, -stop_on_failure - Aborts execution on the first failed job; default is off");
System.out.println(" -M, -no_multiquery - Turn multiquery optimization off; default is on");
+ System.out.println(" -N, -no_fetch - Turn fetch optimization off; default is on");
System.out.println(" -P, -propertyFile - Path to property file");
System.out.println(" -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including");
System.out.println(" any environment variables that are set by the pig command.");
@@ -885,6 +891,8 @@ public class Main {
System.out.println(" Only disable combiner as a temporary workaround for problems.");
System.out.println(" opt.multiquery=true|false; multiquery is on by default.");
System.out.println(" Only disable multiquery as a temporary workaround for problems.");
+ System.out.println(" opt.fetch=true|false; fetch is on by default.");
+ System.out.println(" Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.");
System.out.println(" pig.tmpfilecompression=true|false; compression is off by default.");
System.out.println(" Determines whether output of intermediate jobs is compressed.");
System.out.println(" pig.tmpfilecompression.codec=lzo|gzip; default is gzip.");
Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Mon Feb 24 21:41:38 2014
@@ -125,7 +125,7 @@ public class PigConfiguration {
*/
public static final String MAX_SCRIPT_SIZE = "pig.script.max.size";
- /**
+ /**
* This key is used to define whether to have intermediate file compressed
*/
public static final String PIG_ENABLE_TEMP_FILE_COMPRESSION = "pig.tmpfilecompression";
@@ -160,9 +160,9 @@ public class PigConfiguration {
public static final String PERC_MEM_AVAIL = "pig.skewedjoin.reduce.memusage";
/**
- * This key used to control the maximum size loaded into
- * the distributed cache when doing fragment-replicated join
- */
+ * This key used to control the maximum size loaded into
+ * the distributed cache when doing fragment-replicated join
+ */
public static final String PIG_JOIN_REPLICATED_MAX_BYTES = "pig.join.replicated.max.bytes";
/**
@@ -186,5 +186,49 @@ public class PigConfiguration {
* of nested distinct or sort
*/
public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
+
+ /**
+ * This key used to control the sample size of RandomeSampleLoader for
+ * order-by. The default value is 100 rows per task.
+ */
+ public static final String PIG_RANDOM_SAMPLER_SAMPLE_SIZE = "pig.random.sampler.sample.size";
+
+ /**
+ * This key is to turn on auto local mode feature
+ */
+ public static final String PIG_AUTO_LOCAL_ENABLED = "pig.auto.local.enabled";
+
+ /**
+ * Controls the max threshold size to convert jobs to run in local mode
+ */
+ public static final String PIG_AUTO_LOCAL_INPUT_MAXBYTES = "pig.auto.local.input.maxbytes";
+
+ /**
+ * This parameter enables/disables fetching. By default it is turned on.
+ */
+ public static final String OPT_FETCH = "opt.fetch";
+
+ /**
+ * This key is used to define whether PigOutputFormat will be wrapped with LazyOutputFormat
+ * so that jobs won't write empty part files if no output is generated
+ */
+ public static final String PIG_OUTPUT_LAZY = "pig.output.lazy";
+
+ /**
+ * Location where pig stores temporary files for job setup
+ */
+ public static final String PIG_TEMP_DIR = "pig.temp.dir";
+
+ /**
+ * This key is turn on the user level cache
+ */
+ public static final String PIG_USER_CACHE_ENABLED = "pig.user.cache.enabled";
+
+ /**
+ * Location where additional jars are cached for the user
+ * Additional jar will be cached under PIG_USER_CACHE_LOCATION/${user.name}/.pigcache
+ * and will be re-used across the jobs run by the user if the jar has not changed
+ */
+ public static final String PIG_USER_CACHE_LOCATION = "pig.user.cache.location";
}
Modified: pig/branches/tez/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigServer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigServer.java Mon Feb 24 21:41:38 2014
@@ -104,6 +104,7 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.tools.pigstats.SimpleFetchPigStats;
import com.google.common.annotations.VisibleForTesting;
@@ -241,6 +242,7 @@ public class PigServer {
}
addJarsFromProperties();
+ markPredeployedJarsFromProperties();
if (PigStats.get() == null) {
PigStats.start(pigContext.getExecutionEngine().instantiatePigStats());
@@ -277,6 +279,22 @@ public class PigServer {
}
}
+ private void markPredeployedJarsFromProperties() throws ExecException {
+ // mark jars as predeployed from properties
+ String jar_str = pigContext.getProperties().getProperty("pig.predeployed.jars");
+
+ if(jar_str != null){
+ // Use File.pathSeparator (":" on Linux, ";" on Windows)
+ // to correctly handle path aggregates as they are represented
+ // on the Operating System.
+ for(String jar : jar_str.split(File.pathSeparator)){
+ if (jar.length() > 0) {
+ pigContext.markJarAsPredeployed(jar);
+ }
+ }
+ }
+ }
+
public PigContext getPigContext(){
return pigContext;
}
@@ -413,6 +431,12 @@ public class PigServer {
*/
protected List<ExecJob> getJobs(PigStats stats) {
LinkedList<ExecJob> jobs = new LinkedList<ExecJob>();
+ if (stats instanceof SimpleFetchPigStats) {
+ HJob job = new HJob(HJob.JOB_STATUS.COMPLETED, pigContext, stats.result(null)
+ .getPOStore(), null);
+ jobs.add(job);
+ return jobs;
+ }
JobGraph jGraph = stats.getJobGraph();
Iterator<JobStats> iter = jGraph.iterator();
while (iter.hasNext()) {
@@ -1719,6 +1743,9 @@ public class PigServer {
CompilationMessageCollector collector = new CompilationMessageCollector() ;
new TypeCheckingRelVisitor( lp, collector).visit();
+ new UnionOnSchemaSetter( lp ).visit();
+ new CastLineageSetter(lp, collector).visit();
+ new ScalarVariableValidator(lp).visit();
if(aggregateWarning) {
CompilationMessageCollector.logMessages(collector, MessageType.Warning, aggregateWarning, log);
} else {
@@ -1727,9 +1754,6 @@ public class PigServer {
}
}
- new UnionOnSchemaSetter( lp ).visit();
- new CastLineageSetter(lp, collector).visit();
- new ScalarVariableValidator(lp).visit();
}
private void postProcess() throws IOException {
Modified: pig/branches/tez/src/org/apache/pig/PigWarning.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigWarning.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigWarning.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigWarning.java Mon Feb 24 21:41:38 2014
@@ -66,6 +66,7 @@ public enum PigWarning {
REDUCER_COUNT_LOW,
NULL_COUNTER_COUNT,
DELETE_FAILED,
- PROJECTION_INVALID_RANGE
+ PROJECTION_INVALID_RANGE,
+ NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY
;
}
Modified: pig/branches/tez/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/StoreFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/StoreFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/StoreFunc.java Mon Feb 24 21:41:38 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
@@ -202,7 +203,6 @@ public abstract class StoreFunc implemen
* @param warningEnum type of warning
*/
public final void warn(String msg, Enum warningEnum) {
- Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
- counter.increment(1);
+ PigHadoopLogger.getInstance().warn(this, msg, warningEnum);
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Mon Feb 24 21:41:38 2014
@@ -18,16 +18,16 @@
package org.apache.pig.backend.hadoop.datastorage;
-import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
-import java.util.Properties;
import java.util.Map.Entry;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
public class ConfigurationUtil {
@@ -83,21 +83,10 @@ public class ConfigurationUtil {
// so build/classes/hadoop-site.xml contains such entry. This prevents some tests from
// successful (They expect those files in hdfs), so we need to unset it in hadoop 23.
// This should go away once MiniMRCluster fix the distributed cache issue.
- Method unsetMethod = null;
- try {
- unsetMethod = localConf.getClass().getMethod("unset", new Class[]{String.class});
- } catch (Exception e) {
- }
- if (unsetMethod!=null) {
- try {
- unsetMethod.invoke(localConf, new Object[]{"mapreduce.job.cache.files"});
- } catch (Exception e) {
- // Should not happen
- }
- }
+ HadoopShims.unsetConf(localConf, "mapreduce.job.cache.files");
}
+ localConf.set(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
Properties props = ConfigurationUtil.toProperties(localConf);
- props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
return props;
}
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Mon Feb 24 21:41:38 2014
@@ -1,204 +1,208 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.backend.hadoop.datastorage;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.IOException;
-import java.util.Properties;
-import java.util.Map;
-import java.util.HashMap;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.PigException;
-import org.apache.pig.backend.datastorage.*;
-import org.apache.pig.backend.executionengine.ExecException;
-
-public abstract class HPath implements ElementDescriptor {
-
- protected Path path;
- protected HDataStorage fs;
-
- public HPath(HDataStorage fs, Path parent, Path child) {
- this.path = new Path(parent, child);
- this.fs = fs;
- }
-
- public HPath(HDataStorage fs, String parent, String child) {
- this(fs, new Path(parent), new Path(child));
- }
-
- public HPath(HDataStorage fs, Path parent, String child) {
- this(fs, parent, new Path(child));
- }
-
- public HPath(HDataStorage fs, String parent, Path child) {
- this(fs, new Path(parent), child);
- }
-
- public HPath(HDataStorage fs, String pathString) {
- this(fs, new Path(pathString));
- }
-
- public HPath(HDataStorage fs, Path path) {
- this.path = path;
- this.fs = fs;
- }
-
- public DataStorage getDataStorage() {
- return fs;
- }
-
- public abstract OutputStream create(Properties configuration)
- throws IOException;
-
- public void copy(ElementDescriptor dstName,
- Properties dstConfiguration,
- boolean removeSrc)
- throws IOException {
- FileSystem srcFS = this.fs.getHFS();
- FileSystem dstFS = ((HPath)dstName).fs.getHFS();
-
- Path srcPath = this.path;
- Path dstPath = ((HPath)dstName).path;
-
- boolean result = FileUtil.copy(srcFS,
- srcPath,
- dstFS,
- dstPath,
- false,
- new Configuration());
-
- if (!result) {
- int errCode = 2097;
- String msg = "Failed to copy from: " + this.toString() +
- " to: " + dstName.toString();
- throw new ExecException(msg, errCode, PigException.BUG);
- }
- }
-
- public abstract InputStream open() throws IOException;
-
- public abstract SeekableInputStream sopen() throws IOException;
-
- public boolean exists() throws IOException {
- return fs.getHFS().exists(path);
- }
-
- public void rename(ElementDescriptor newName)
- throws IOException {
- if (newName != null) {
- fs.getHFS().rename(path, ((HPath)newName).path);
- }
- }
-
- public void delete() throws IOException {
- // the file is removed and not placed in the trash bin
- fs.getHFS().delete(path, true);
- }
-
- public Properties getConfiguration() throws IOException {
- HConfiguration props = new HConfiguration();
-
- long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
-
- short replication = fs.getHFS().getFileStatus(path).getReplication();
-
- props.setProperty(BLOCK_SIZE_KEY, (Long.valueOf(blockSize)).toString());
- props.setProperty(BLOCK_REPLICATION_KEY, (Short.valueOf(replication)).toString());
-
- return props;
- }
-
- public void updateConfiguration(Properties newConfig) throws IOException {
- if (newConfig == null) {
- return;
- }
-
- String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
-
- fs.getHFS().setReplication(path,
- new Short(blkReplStr).shortValue());
- }
-
- public Map<String, Object> getStatistics() throws IOException {
- HashMap<String, Object> props = new HashMap<String, Object>();
-
- FileStatus fileStatus = fs.getHFS().getFileStatus(path);
-
- props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
- props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
- props.put(LENGTH_KEY, fileStatus.getLen());
- props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
-
- return props;
- }
-
- public OutputStream create() throws IOException {
- return create(null);
- }
-
- public void copy(ElementDescriptor dstName,
- boolean removeSrc)
- throws IOException {
- copy(dstName, null, removeSrc);
- }
-
- public Path getPath() {
- return path;
- }
-
- public FileSystem getHFS() {
- return fs.getHFS();
- }
-
- public boolean systemElement() {
- return (path != null &&
- (path.getName().startsWith("_") ||
- path.getName().startsWith(".")));
- }
-
- @Override
- public String toString() {
- return path.makeQualified(getHFS()).toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (! (obj instanceof HPath)) {
- return false;
- }
-
- return this.path.equals(((HPath)obj).path);
- }
-
- public int compareTo(ElementDescriptor other) {
- return path.compareTo(((HPath)other).path);
- }
-
- @Override
- public int hashCode() {
- return this.path.hashCode();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.datastorage;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.datastorage.*;
+import org.apache.pig.backend.executionengine.ExecException;
+
+public abstract class HPath implements ElementDescriptor {
+
+ protected Path path;
+ protected HDataStorage fs;
+
+ public HPath(HDataStorage fs, Path parent, Path child) {
+ this.path = new Path(parent, child);
+ this.fs = fs;
+ }
+
+ public HPath(HDataStorage fs, String parent, String child) {
+ this(fs, new Path(parent), new Path(child));
+ }
+
+ public HPath(HDataStorage fs, Path parent, String child) {
+ this(fs, parent, new Path(child));
+ }
+
+ public HPath(HDataStorage fs, String parent, Path child) {
+ this(fs, new Path(parent), child);
+ }
+
+ public HPath(HDataStorage fs, String pathString) {
+ this(fs, new Path(pathString));
+ }
+
+ public HPath(HDataStorage fs, Path path) {
+ this.path = path;
+ this.fs = fs;
+ }
+
+ public DataStorage getDataStorage() {
+ return fs;
+ }
+
+ public abstract OutputStream create(Properties configuration)
+ throws IOException;
+
+ public void copy(ElementDescriptor dstName,
+ Properties dstConfiguration,
+ boolean removeSrc)
+ throws IOException {
+ FileSystem srcFS = this.fs.getHFS();
+ FileSystem dstFS = ((HPath)dstName).fs.getHFS();
+
+ Path srcPath = this.path;
+ Path dstPath = ((HPath)dstName).path;
+
+ boolean result = FileUtil.copy(srcFS,
+ srcPath,
+ dstFS,
+ dstPath,
+ false,
+ new Configuration());
+
+ if (!result) {
+ int errCode = 2097;
+ String msg = "Failed to copy from: " + this.toString() +
+ " to: " + dstName.toString();
+ throw new ExecException(msg, errCode, PigException.BUG);
+ }
+ }
+
+ public abstract InputStream open() throws IOException;
+
+ public abstract SeekableInputStream sopen() throws IOException;
+
+ public boolean exists() throws IOException {
+ return fs.getHFS().exists(path);
+ }
+
+ public void rename(ElementDescriptor newName)
+ throws IOException {
+ if (newName != null) {
+ fs.getHFS().rename(path, ((HPath)newName).path);
+ }
+ }
+
+ public void delete() throws IOException {
+ // the file is removed and not placed in the trash bin
+ fs.getHFS().delete(path, true);
+ }
+
+ public void setPermission(FsPermission permission) throws IOException {
+ fs.getHFS().setPermission(path, permission);
+ }
+
+ public Properties getConfiguration() throws IOException {
+ HConfiguration props = new HConfiguration();
+
+ long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+
+ short replication = fs.getHFS().getFileStatus(path).getReplication();
+
+ props.setProperty(BLOCK_SIZE_KEY, (Long.valueOf(blockSize)).toString());
+ props.setProperty(BLOCK_REPLICATION_KEY, (Short.valueOf(replication)).toString());
+
+ return props;
+ }
+
+ public void updateConfiguration(Properties newConfig) throws IOException {
+ if (newConfig == null) {
+ return;
+ }
+
+ String blkReplStr = newConfig.getProperty(BLOCK_REPLICATION_KEY);
+
+ fs.getHFS().setReplication(path,
+ new Short(blkReplStr).shortValue());
+ }
+
+ public Map<String, Object> getStatistics() throws IOException {
+ HashMap<String, Object> props = new HashMap<String, Object>();
+
+ FileStatus fileStatus = fs.getHFS().getFileStatus(path);
+
+ props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
+ props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
+ props.put(LENGTH_KEY, fileStatus.getLen());
+ props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
+
+ return props;
+ }
+
+ public OutputStream create() throws IOException {
+ return create(null);
+ }
+
+ public void copy(ElementDescriptor dstName,
+ boolean removeSrc)
+ throws IOException {
+ copy(dstName, null, removeSrc);
+ }
+
+ public Path getPath() {
+ return path;
+ }
+
+ public FileSystem getHFS() {
+ return fs.getHFS();
+ }
+
+ public boolean systemElement() {
+ return (path != null &&
+ (path.getName().startsWith("_") ||
+ path.getName().startsWith(".")));
+ }
+
+ @Override
+ public String toString() {
+ return path.makeQualified(getHFS()).toString();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (! (obj instanceof HPath)) {
+ return false;
+ }
+
+ return this.path.equals(((HPath)obj).path);
+ }
+
+ public int compareTo(ElementDescriptor other) {
+ return path.compareTo(((HPath)other).path);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.path.hashCode();
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Feb 24 21:41:38 2014
@@ -40,7 +40,8 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.executionengine.ExecutionEngine;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -76,17 +77,21 @@ public abstract class HExecutionEngine i
private static final Log LOG = LogFactory.getLog(HExecutionEngine.class);
- public static final String LOCAL = "local";
- public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
- public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
- public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
public static final String HADOOP_SITE = "hadoop-site.xml";
public static final String CORE_SITE = "core-site.xml";
public static final String YARN_SITE = "yarn-site.xml";
+ public static final String CORE_DEFAULT_SITE = "core-default.xml";
+ public static final String MAPRED_DEFAULT_SITE = "mapred-default.xml";
+ public static final String YARN_DEFAULT_SITE = "yarn-default.xml";
+
+ public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+ public static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+ public static final String ALTERNATIVE_FILE_SYSTEM_LOCATION = "fs.defaultFS";
+ public static final String MAPREDUCE_FRAMEWORK_NAME = "mapreduce.framework.name";
+ public static final String LOCAL = "local";
protected PigContext pigContext;
protected DataStorage ds;
- protected JobConf jobConf;
protected Launcher launcher;
// key: the operator key from the logical plan that originated the physical plan
@@ -98,12 +103,14 @@ public abstract class HExecutionEngine i
public HExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
this.ds = null;
- this.jobConf = null;
this.logicalToPhysicalKeys = Maps.newHashMap();
}
+ @Deprecated
public JobConf getJobConf() {
- return this.jobConf;
+ JobConf jc = new JobConf(false);
+ Utils.recomputeProperties(jc, pigContext.getProperties());
+ return jc;
}
public DataStorage getDataStorage() {
@@ -114,7 +121,52 @@ public abstract class HExecutionEngine i
init(this.pigContext.getProperties());
}
- @SuppressWarnings("resource")
+ public JobConf getLocalConf(Properties properties) {
+ JobConf jc = new JobConf(false);
+
+ // If we are running in local mode we dont read the hadoop conf file
+ if (properties.getProperty(MAPREDUCE_FRAMEWORK_NAME) == null) {
+ jc.set(MAPREDUCE_FRAMEWORK_NAME, LOCAL);
+ }
+ jc.set(JOB_TRACKER_LOCATION, LOCAL);
+ jc.set(FILE_SYSTEM_LOCATION, "file:///");
+ jc.set(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
+
+ jc.addResource(MAPRED_DEFAULT_SITE);
+ jc.addResource(YARN_DEFAULT_SITE);
+ jc.addResource(CORE_DEFAULT_SITE);
+
+ return jc;
+ }
+
+ public JobConf getExecConf(Properties properties) throws ExecException {
+ JobConf jc = null;
+ // Check existence of user provided configs
+ String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
+ if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
+ jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
+ } else {
+ // Check existence of hadoop-site.xml or core-site.xml in
+ // classpath if user provided confs are not being used
+ Configuration testConf = new Configuration();
+ ClassLoader cl = testConf.getClassLoader();
+ URL hadoop_site = cl.getResource(HADOOP_SITE);
+ URL core_site = cl.getResource(CORE_SITE);
+
+ if (hadoop_site == null && core_site == null) {
+ throw new ExecException(
+ "Cannot find hadoop configurations in classpath "
+ + "(neither hadoop-site.xml nor core-site.xml was found in the classpath)."
+ + " If you plan to use local mode, please put -x local option in command line",
+ 4010);
+ }
+ jc = new JobConf();
+ }
+ jc.addResource("pig-cluster-hadoop-site.xml");
+ jc.addResource(YARN_SITE);
+ return jc;
+ }
+
private void init(Properties properties) throws ExecException {
String cluster = null;
String nameNode = null;
@@ -138,54 +190,20 @@ public abstract class HExecutionEngine i
JobConf jc = null;
if (!this.pigContext.getExecType().isLocal()) {
- // Check existence of user provided configs
- String isHadoopConfigsOverriden = properties.getProperty("pig.use.overriden.hadoop.configs");
- if (isHadoopConfigsOverriden != null && isHadoopConfigsOverriden.equals("true")) {
- jc = new JobConf(ConfigurationUtil.toConfiguration(properties));
- } else {
- // Check existence of hadoop-site.xml or core-site.xml in
- // classpath if user provided confs are not being used
- Configuration testConf = new Configuration();
- ClassLoader cl = testConf.getClassLoader();
- URL hadoop_site = cl.getResource(HADOOP_SITE);
- URL core_site = cl.getResource(CORE_SITE);
-
- if (hadoop_site == null && core_site == null) {
- throw new ExecException(
- "Cannot find hadoop configurations in classpath "
- + "(neither hadoop-site.xml nor core-site.xml was found in the classpath)."
- + " If you plan to use local mode, please put -x local option in command line",
- 4010);
- }
- jc = new JobConf();
- }
- jc.addResource("pig-cluster-hadoop-site.xml");
- jc.addResource(YARN_SITE);
+ jc = getExecConf(properties);
// Trick to invoke static initializer of DistributedFileSystem to
// add hdfs-default.xml into configuration
new DistributedFileSystem();
-
- // the method below alters the properties object by overriding the
- // hadoop properties with the values from properties and recomputing
- // the properties
- Utils.recomputeProperties(jc, properties);
} else {
- // If we are running in local mode we dont read the hadoop conf file
- if (properties.getProperty("mapreduce.framework.name") == null) {
- properties.setProperty("mapreduce.framework.name", "local");
- }
- properties.setProperty(JOB_TRACKER_LOCATION, LOCAL);
- properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
- properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
-
- jc = new JobConf(false);
- jc.addResource("core-default.xml");
- jc.addResource("mapred-default.xml");
- jc.addResource("yarn-default.xml");
- Utils.recomputeProperties(jc, properties);
+ jc = getLocalConf(properties);
}
+ // the method below alters the properties object by overriding the
+ // hadoop properties with the values from properties and recomputing
+ // the properties
+ Utils.recomputeProperties(jc, properties);
+
cluster = jc.get(JOB_TRACKER_LOCATION);
nameNode = jc.get(FILE_SYSTEM_LOCATION);
if (nameNode == null) {
@@ -215,9 +233,6 @@ public abstract class HExecutionEngine i
LOG.info("Connecting to map-reduce job tracker at: "
+ jc.get(JOB_TRACKER_LOCATION));
}
-
- // Set job-specific configuration knobs
- jobConf = jc;
}
@SuppressWarnings("unchecked")
@@ -341,6 +356,13 @@ public abstract class HExecutionEngine i
try {
PhysicalPlan pp = compile(lp, pc.getProperties());
+ //if the compiled physical plan fulfills the requirements of the
+ //fetch optimizer, then further transformations / MR jobs creations are
+ //skipped; a SimpleFetchPigStats will be returned through which the result
+ //can be directly fetched from the underlying storage
+ if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+ return new FetchLauncher(pc).launchPig(pp);
+ }
return launcher.launchPig(pp, grpName, pigContext);
} catch (ExecException e) {
throw (ExecException) e;
@@ -355,8 +377,8 @@ public abstract class HExecutionEngine i
public void explain(LogicalPlan lp, PigContext pc, PrintStream ps,
String format, boolean verbose, File file, String suffix)
- throws PlanException, VisitorException, IOException,
- FrontendException {
+ throws PlanException, VisitorException, IOException,
+ FrontendException {
PrintStream pps = ps;
PrintStream eps = ps;
@@ -371,6 +393,10 @@ public abstract class HExecutionEngine i
pp.explain(pps, format, verbose);
MapRedUtil.checkLeafIsStore(pp, pigContext);
+ if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+ new FetchLauncher(pigContext).explain(pp, pc, eps, format);
+ return;
+ }
launcher.explain(pp, pigContext, eps, format, verbose);
} finally {
launcher.reset();
@@ -383,10 +409,9 @@ public abstract class HExecutionEngine i
}
public Properties getConfiguration() {
- if (jobConf == null) {
- return null;
- }
- return ConfigurationUtil.toProperties(jobConf);
+ Properties properties = new Properties();
+ properties.putAll(pigContext.getProperties());
+ return properties;
}
public void setConfiguration(Properties newConfiguration) throws ExecException {
@@ -396,7 +421,6 @@ public abstract class HExecutionEngine i
public void setProperty(String property, String value) {
Properties properties = pigContext.getProperties();
properties.put(property, value);
- Utils.recomputeProperties(jobConf, properties);
}
public ExecutableManager getExecutableManager() {
@@ -405,7 +429,7 @@ public abstract class HExecutionEngine i
public void killJob(String jobID) throws BackendException {
if (launcher != null) {
- launcher.killJob(jobID, jobConf);
+ launcher.killJob(jobID, getJobConf());
}
}
Copied: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (from r1571421, pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java)
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?p2=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java&p1=pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java&r1=1571421&r2=1571454&rev=1571454&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Mon Feb 24 21:41:38 2014
@@ -32,19 +32,16 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -231,16 +228,6 @@ public class FetchOptimizer {
}
@Override
- public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
- public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
public void visitSplit(POSplit spl) throws VisitorException {
planFetchable = false;
}
@@ -271,11 +258,6 @@ public class FetchOptimizer {
}
@Override
- public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
- planFetchable = false;
- }
-
- @Override
public void visitCross(POCross cross) throws VisitorException {
planFetchable = false;
}