You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC
svn commit: r1571454 [3/5] - in /pig/branches/tez: ./ conf/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/
contrib/piggybank/java/src/main/java/...
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb 24 21:41:38 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -31,10 +33,13 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +47,7 @@ import org.apache.hadoop.filecache.Distr
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.Counters;
@@ -54,6 +60,7 @@ import org.apache.hadoop.mapred.jobcontr
import org.apache.pig.ComparisonFunc;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
@@ -84,6 +91,7 @@ import org.apache.pig.data.SchemaTupleFr
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.io.NullableBigDecimalWritable;
@@ -107,6 +115,7 @@ import org.apache.pig.impl.util.ObjectSe
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
/**
@@ -133,9 +142,11 @@ import org.apache.pig.tools.pigstats.map
* These are all just type specific instances of WritableComparator.
*
*/
+@SuppressWarnings("deprecation")
public class JobControlCompiler{
MROperPlan plan;
Configuration conf;
+ Configuration defaultConf;
PigContext pigContext;
private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
@@ -155,6 +166,9 @@ public class JobControlCompiler{
public static final String PIG_MAP_SEPARATOR = "_";
public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
+ public static final String SMALL_JOB_LOG_MSG = "This job was detected as a small job, will run in-process instead";
+ public static final String BIG_JOB_LOG_MSG = "This job cannot be converted run in-process";
+
/**
* We will serialize the POStore(s) present in map and reduce in lists in
* the Hadoop Conf. In the case of Multi stores, we could deduce these from
@@ -173,8 +187,13 @@ public class JobControlCompiler{
private int counterSize;
public JobControlCompiler(PigContext pigContext, Configuration conf) {
+ this(pigContext, conf, null);
+ }
+
+ public JobControlCompiler(PigContext pigContext, Configuration conf, Configuration defaultConf) {
this.pigContext = pigContext;
this.conf = conf;
+ this.defaultConf = defaultConf;
jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
jobMroMap = new HashMap<Job, MapReduceOper>();
}
@@ -278,15 +297,15 @@ public class JobControlCompiler{
String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
String pigJobControlSleep = conf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
- log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
+ log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
}
try {
- timeToSleep = Integer.parseInt(pigJobControlSleep);
+ timeToSleep = Integer.parseInt(pigJobControlSleep);
} catch (NumberFormatException e) {
- throw new RuntimeException("Invalid configuration " +
- "pig.jobcontrol.sleep=" + pigJobControlSleep +
- " should be a time in ms. default=" + defaultPigJobControlSleep, e);
+ throw new RuntimeException("Invalid configuration " +
+ "pig.jobcontrol.sleep=" + pigJobControlSleep +
+ " should be a time in ms. default=" + defaultPigJobControlSleep, e);
}
JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
@@ -362,7 +381,24 @@ public class JobControlCompiler{
try {
counters = HadoopShims.getCounters(job);
- groupCounters = counters.getGroup(getGroupName(counters.getGroupNames()));
+
+ String groupName = getGroupName(counters.getGroupNames());
+ // In case that the counter group was not find, we need to find
+ // out why. Only acceptable state is that the relation has been
+ // empty.
+ if (groupName == null) {
+ Counter outputRecords =
+ counters.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP)
+ .getCounterForName(MRPigStatsUtil.MAP_OUTPUT_RECORDS);
+
+ if(outputRecords.getCounter() == 0) {
+ globalCounters.put(operationID, new ArrayList<Pair<String, Long>>());
+ return;
+ } else {
+ throw new RuntimeException("Did not found RANK counter group for operationId: " + operationID);
+ }
+ }
+ groupCounters = counters.getGroup(groupName);
Iterator<Counter> it = groupCounters.iterator();
HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
@@ -399,6 +435,29 @@ public class JobControlCompiler{
}
return null;
}
+
+ private boolean okToRunLocal(org.apache.hadoop.mapreduce.Job job, MapReduceOper mro, List<POLoad> lds) throws IOException {
+ Configuration conf = job.getConfiguration();
+ if(!conf.getBoolean(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, false)) {
+ return false;
+ }
+
+ long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job);
+ long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
+ log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
+ if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
+ return false;
+ }
+
+ int reducers = conf.getInt("mapred.reduce.tasks", 1);
+ log.info("No of reducers: " + reducers);
+ if (reducers > 1) {
+ return false;
+ }
+
+ return true;
+ }
+
/**
* The method that creates the Job corresponding to a MapReduceOper.
* The assumption is that
@@ -421,7 +480,7 @@ public class JobControlCompiler{
* @return Job corresponding to mro
* @throws JobCreationException
*/
- @SuppressWarnings({ "unchecked", "deprecation" })
+ @SuppressWarnings({ "unchecked" })
private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
org.apache.hadoop.mapreduce.Job nwJob = null;
@@ -470,7 +529,6 @@ public class JobControlCompiler{
}
try{
-
//Process the POLoads
List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
@@ -487,10 +545,12 @@ public class JobControlCompiler{
if(!mro.reducePlan.isEmpty()){
log.info("Reduce phase detected, estimating # of required reducers.");
adjustNumReducers(plan, mro, nwJob);
+ } else {
+ nwJob.setNumReduceTasks(0);
}
if(lds!=null && lds.size()>0){
- for (POLoad ld : lds) {
+ for (POLoad ld : lds) {
//Store the target operators for tuples read
//from this input
List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
@@ -511,28 +571,50 @@ public class JobControlCompiler{
if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
{
+ if (okToRunLocal(nwJob, mro, lds)) {
+ log.info(SMALL_JOB_LOG_MSG);
+ // override with the default conf to run in local mode
+ for (Entry<String, String> entry : defaultConf) {
+ String key = entry.getKey();
+ if (key.equals("mapred.reduce.tasks")) {
+ // this must not be set back to the default in case it has been set to 0 for example.
+ continue;
+ }
+ if (key.startsWith("fs.")) {
+ // we don't want to change fs settings back
+ continue;
+ }
+ String value = entry.getValue();
+ if (conf.get(key) == null || !conf.get(key).equals(value)) {
+ conf.set(key, value);
+ }
+ }
- // Setup the DistributedCache for this job
- for (URL extraJar : pigContext.extraJars) {
- log.debug("Adding jar to DistributedCache: " + extraJar.toString());
- Utils.putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
- }
-
- for (String scriptJar : pigContext.scriptJars) {
- log.debug("Adding jar to DistributedCache: " + scriptJar.toString());
- Utils.putJarOnClassPathThroughDistributedCache(pigContext, conf, new File(scriptJar).toURI().toURL());
- }
+ conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
+ } else {
+ log.info(BIG_JOB_LOG_MSG);
+ // Setup the DistributedCache for this job
+ for (URL extraJar : pigContext.extraJars) {
+ log.debug("Adding jar to DistributedCache: " + extraJar.toString());
+ putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
+ }
- //Create the jar of all functions and classes required
- File submitJarFile = File.createTempFile("Job", ".jar");
- log.info("creating jar file "+submitJarFile.getName());
- // ensure the job jar is deleted on exit
- submitJarFile.deleteOnExit();
- FileOutputStream fos = new FileOutputStream(submitJarFile);
- JarManager.createJar(fos, mro.UDFs, pigContext);
- log.info("jar file "+submitJarFile.getName()+" created");
- //Start setting the JobConf properties
- conf.set("mapred.jar", submitJarFile.getPath());
+ for (String scriptJar : pigContext.scriptJars) {
+ log.debug("Adding jar to DistributedCache: " + scriptJar.toString());
+ putJarOnClassPathThroughDistributedCache(pigContext, conf, new File(scriptJar).toURI().toURL());
+ }
+
+ //Create the jar of all functions and classes required
+ File submitJarFile = File.createTempFile("Job", ".jar");
+ log.info("creating jar file "+submitJarFile.getName());
+ // ensure the job jar is deleted on exit
+ submitJarFile.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(submitJarFile);
+ JarManager.createJar(fos, mro.UDFs, pigContext);
+ log.info("jar file "+submitJarFile.getName()+" created");
+ //Start setting the JobConf properties
+ conf.set("mapred.jar", submitJarFile.getPath());
+ }
}
conf.set("pig.inputs", ObjectSerializer.serialize(inp));
conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
@@ -553,46 +635,80 @@ public class JobControlCompiler{
// and set the hadoop job priority.
String jobPriority = pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY).toUpperCase();
try {
- // Allow arbitrary case; the Hadoop job priorities are all upper case.
- conf.set("mapred.job.priority", JobPriority.valueOf(jobPriority).toString());
+ // Allow arbitrary case; the Hadoop job priorities are all upper case.
+ conf.set("mapred.job.priority", JobPriority.valueOf(jobPriority).toString());
} catch (IllegalArgumentException e) {
- StringBuffer sb = new StringBuffer("The job priority must be one of [");
- JobPriority[] priorities = JobPriority.values();
- for (int i = 0; i < priorities.length; ++i) {
- if (i > 0) sb.append(", ");
- sb.append(priorities[i]);
- }
- sb.append("]. You specified [" + jobPriority + "]");
- throw new JobCreationException(sb.toString());
+ StringBuffer sb = new StringBuffer("The job priority must be one of [");
+ JobPriority[] priorities = JobPriority.values();
+ for (int i = 0; i < priorities.length; ++i) {
+ if (i > 0) sb.append(", ");
+ sb.append(priorities[i]);
+ }
+ sb.append("]. You specified [" + jobPriority + "]");
+ throw new JobCreationException(sb.toString());
}
}
- setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
- "pig.streaming.ship.files", true);
- setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
- "pig.streaming.cache.files", false);
+ setupDistributedCache(pigContext, conf, pigContext.getProperties(),
+ "pig.streaming.ship.files", true);
+ setupDistributedCache(pigContext, conf, pigContext.getProperties(),
+ "pig.streaming.cache.files", false);
nwJob.setInputFormatClass(PigInputFormat.class);
+ // tmp file compression setups
+ // PIG-3741 This must be done before setStoreLocation on POStores
+ Utils.setTmpFileCompressionOnConf(pigContext, conf);
+
//Process POStore and remove it from the plan
LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
- for (POStore st: mapStores) {
+ for (POStore st : mapStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (sFunc instanceof OverwritableStoreFunc) {
+ OverwritableStoreFunc osf = (OverwritableStoreFunc) sFunc;
+ if (osf.shouldOverwrite()) {
+ osf.cleanupOutput(st, nwJob);
+ }
+ }
}
- for (POStore st: reduceStores) {
+ for (POStore st : reduceStores) {
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ if (sFunc instanceof OverwritableStoreFunc) {
+ OverwritableStoreFunc osf = (OverwritableStoreFunc) sFunc;
+ if (osf.shouldOverwrite()) {
+ osf.cleanupOutput(st, nwJob);
+ }
+ }
}
- // the OutputFormat we report to Hadoop is always PigOutputFormat
- nwJob.setOutputFormatClass(PigOutputFormat.class);
+ // the OutputFormat we report to Hadoop is always PigOutputFormat which
+ // can be wrapped with LazyOutputFormat provided if it is supported by
+ // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+ try {
+ Class<?> clazz = PigContext.resolveClassName(
+ "org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+ Method method = clazz.getMethod("setOutputFormatClass", nwJob.getClass(),
+ Class.class);
+ method.invoke(null, nwJob, PigOutputFormat.class);
+ }
+ catch (Exception e) {
+ nwJob.setOutputFormatClass(PigOutputFormat.class);
+ log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+ + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+ }
+ }
+ else {
+ nwJob.setOutputFormatClass(PigOutputFormat.class);
+ }
if (mapStores.size() + reduceStores.size() == 1) { // single store case
log.info("Setting up single store job");
@@ -608,28 +724,12 @@ public class JobControlCompiler{
if(!pigContext.inIllustrator)
mro.reducePlan.remove(st);
}
-
- // set out filespecs
- String outputPathString = st.getSFile().getFileName();
- if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
- conf.set("pig.streaming.log.dir",
- new Path(outputPathString, LOG_DIR).toString());
- } else {
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pigContext).toString();
- tmpLocation = new Path(tmpLocationStr);
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, LOG_DIR).toString());
- }
- conf.set("pig.streaming.task.output.dir", outputPathString);
+
+ MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf);
}
- else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
+ else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
log.info("Setting up multi store job");
- String tmpLocationStr = FileLocalizer
- .getTemporaryPath(pigContext).toString();
- tmpLocation = new Path(tmpLocationStr);
-
- nwJob.setOutputFormatClass(PigOutputFormat.class);
+ MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
if (disableCounter) {
@@ -641,11 +741,7 @@ public class JobControlCompiler{
sto.setMultiStore(true);
sto.setIndex(idx++);
}
-
- conf.set("pig.streaming.log.dir",
- new Path(tmpLocation, LOG_DIR).toString());
- conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
- }
+ }
// store map key type
// this is needed when the key is null to create
@@ -672,7 +768,6 @@ public class JobControlCompiler{
if(mro.reducePlan.isEmpty()){
//MapOnly Job
nwJob.setMapperClass(PigMapOnly.Map.class);
- nwJob.setNumReduceTasks(0);
if(!pigContext.inIllustrator)
conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
if(mro.isEndOfAllInputSetInMap()) {
@@ -775,7 +870,7 @@ public class JobControlCompiler{
} else {
conf.set("pig.sortOrder",
- ObjectSerializer.serialize(mro.getSortOrder()));
+ ObjectSerializer.serialize(mro.getSortOrder()));
}
}
@@ -820,9 +915,6 @@ public class JobControlCompiler{
conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
}
- // tmp file compression setups
- Utils.setTmpFileCompressionOnConf(pigContext, conf);
-
String tmp;
long maxCombinedSplitSize = 0;
if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
@@ -844,12 +936,12 @@ public class JobControlCompiler{
if (newfiles!=null) {
String files = conf.get("mapreduce.job.cache.files");
conf.set("mapreduce.job.cache.files",
- files == null ? newfiles.toString() : files + "," + newfiles);
+ files == null ? newfiles.toString() : files + "," + newfiles);
}
}
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
- Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList<Job>());
+ Job cjob = new Job(new JobConf(conf), new ArrayList<Job>());
jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
return cjob;
@@ -886,7 +978,7 @@ public class JobControlCompiler{
// set the runtime #reducer of the next job as the #partition
ParallelConstantVisitor visitor =
- new ParallelConstantVisitor(mro.reducePlan, nPartitions);
+ new ParallelConstantVisitor(mro.reducePlan, nPartitions);
visitor.visit();
}
log.info("Setting Parallelism to " + jobParallelism);
@@ -931,7 +1023,7 @@ public class JobControlCompiler{
} else {
// reducer estimation could return -1 if it couldn't estimate
log.info("Could not estimate number of reducers and no requested or default " +
- "parallelism set. Defaulting to 1 reducer.");
+ "parallelism set. Defaulting to 1 reducer.");
jobParallelism = 1;
}
}
@@ -949,17 +1041,17 @@ public class JobControlCompiler{
* @throws IOException
*/
public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
- MapReduceOper mapReducerOper) throws IOException {
+ MapReduceOper mapReducerOper) throws IOException {
Configuration conf = job.getConfiguration();
PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
- new InputSizeReducerEstimator() :
- PigContext.instantiateObjectFromParams(conf,
- REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
-
- log.info("Using reducer estimator: " + estimator.getClass().getName());
- int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
- return numberOfReducers;
+ new InputSizeReducerEstimator() :
+ PigContext.instantiateObjectFromParams(conf,
+ REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+
+ log.info("Using reducer estimator: " + estimator.getClass().getName());
+ int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
+ return numberOfReducers;
}
public static class PigSecondaryKeyGroupComparator extends WritableComparator {
@@ -1344,31 +1436,31 @@ public class JobControlCompiler{
PigContext pigContext, Configuration conf) throws IOException {
new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
- .visit();
+ .visit();
new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
- .visit();
+ .visit();
}
private void setupDistributedCacheForUdfs(MapReduceOper mro,
- PigContext pigContext,
- Configuration conf) throws IOException {
+ PigContext pigContext,
+ Configuration conf) throws IOException {
new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
}
private static void setupDistributedCache(PigContext pigContext,
- Configuration conf,
- Properties properties, String key,
- boolean shipToCluster)
- throws IOException {
+ Configuration conf,
+ Properties properties, String key,
+ boolean shipToCluster)
+ throws IOException {
// Set up the DistributedCache for this job
String fileNames = properties.getProperty(key);
- if (fileNames != null) {
- String[] paths = fileNames.split(",");
- setupDistributedCache(pigContext, conf, paths, shipToCluster);
- }
+ if (fileNames != null) {
+ String[] paths = fileNames.split(",");
+ setupDistributedCache(pigContext, conf, paths, shipToCluster);
+ }
}
private static void addToDistributedCache(URI uri, Configuration conf) {
@@ -1396,7 +1488,7 @@ public class JobControlCompiler{
// DistributedCache
if (shipToCluster) {
Path dst =
- new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
+ new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
FileSystem fs = dst.getFileSystem(conf);
fs.copyFromLocalFile(src, dst);
@@ -1419,7 +1511,7 @@ public class JobControlCompiler{
break;
}
String msg = "Invalid ship specification. " +
- "File doesn't exist: " + dst;
+ "File doesn't exist: " + dst;
throw new ExecException(msg, errCode, errSrc);
}
addToDistributedCache(dstURI, conf);
@@ -1430,6 +1522,10 @@ public class JobControlCompiler{
}
}
+ private static boolean isLocal(PigContext pigContext, Configuration conf) {
+ return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+ }
+
private static String addSingleFileToDistributedCache(
PigContext pigContext, Configuration conf, String filename,
String prefix) throws IOException {
@@ -1437,14 +1533,14 @@ public class JobControlCompiler{
if (!pigContext.inIllustrator && !FileLocalizer.fileExists(filename, pigContext)) {
throw new IOException(
"Internal error: skew join partition file "
- + filename + " does not exist");
+ + filename + " does not exist");
}
String symlink = filename;
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added by Hadoop team.
- if (!pigContext.getExecType().isLocal()) {
+ if (!isLocal(pigContext, conf)) {
symlink = prefix + "_"
+ Integer.toString(System.identityHashCode(filename)) + "_"
+ Long.toString(System.currentTimeMillis());
@@ -1487,6 +1583,118 @@ public class JobControlCompiler{
}
}
+ /**
+ * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+ * @param pigContext the pigContext
+ * @param conf the job conf
+ * @param url the url to be added to distributed cache
+ * @return the path as seen on distributed cache
+ * @throws IOException
+ */
+ @SuppressWarnings("deprecation")
+ private static void putJarOnClassPathThroughDistributedCache(
+ PigContext pigContext,
+ Configuration conf,
+ URL url) throws IOException {
+
+ // Turn on the symlink feature
+ DistributedCache.createSymlink(conf);
+
+ // REGISTER always copies locally the jar file. see PigServer.registerJar()
+ Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+ // and add to the DistributedCache
+ DistributedCache.addFileToClassPath(pathInHDFS, conf);
+ pigContext.skipJars.add(url.getPath());
+ }
+
+ private static Path getCacheStagingDir(Configuration conf) throws IOException {
+ String pigTempDir = conf.get(PigConfiguration.PIG_USER_CACHE_LOCATION,
+ conf.get(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
+ String currentUser = System.getProperty("user.name");
+ Path stagingDir = new Path(pigTempDir + "/" + currentUser + "/", ".pigcache");
+ FileSystem fs = FileSystem.get(conf);
+ fs.mkdirs(stagingDir);
+ fs.setPermission(stagingDir, FileLocalizer.OWNER_ONLY_PERMS);
+ return stagingDir;
+ }
+
+ private static Path getFromCache(PigContext pigContext,
+ Configuration conf,
+ URL url) throws IOException {
+ try {
+ Path stagingDir = getCacheStagingDir(conf);
+ String filename = FilenameUtils.getName(url.getPath());
+
+ String checksum = DigestUtils.shaHex(url.openStream());
+ FileSystem fs = FileSystem.get(conf);
+ Path cacheDir = new Path(stagingDir, checksum);
+ FileStatus [] statuses = fs.listStatus(cacheDir);
+ if (statuses != null) {
+ for (FileStatus stat : statuses) {
+ Path jarPath = stat.getPath();
+ if(jarPath.getName().equals(filename)) {
+ log.info("Found " + url + " in jar cache at "+ stagingDir);
+ long curTime = System.currentTimeMillis();
+ fs.setTimes(jarPath, -1, curTime);
+ return jarPath;
+ }
+ }
+ }
+ log.info("Url "+ url + " was not found in jarcache at "+ stagingDir);
+ // attempt to copy to cache else return null
+ fs.mkdirs(cacheDir, FileLocalizer.OWNER_ONLY_PERMS);
+ Path cacheFile = new Path(cacheDir, filename);
+ OutputStream os = FileSystem.create(fs, cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
+ try {
+ IOUtils.copyBytes(url.openStream(), os, 4096, true);
+ } finally {
+ os.close();
+ }
+ return cacheFile;
+
+ } catch (IOException ioe) {
+ log.info("Unable to retrieve jar from jar cache ", ioe);
+ return null;
+ }
+ }
+
+ /**
+ * copy the file to hdfs in a temporary path
+ * @param pigContext the pig context
+ * @param conf the job conf
+ * @param url the url to ship to hdfs
+ * @return the location where it was shipped
+ * @throws IOException
+ */
+ private static Path shipToHDFS(
+ PigContext pigContext,
+ Configuration conf,
+ URL url) throws IOException {
+
+ boolean cacheEnabled =
+ conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, false);
+ if (cacheEnabled) {
+ Path pathOnDfs = getFromCache(pigContext, conf, url);
+ if(pathOnDfs != null) {
+ return pathOnDfs;
+ }
+ }
+ String suffix = FilenameUtils.getName(url.getPath());
+
+ Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+ FileSystem fs = dst.getFileSystem(conf);
+ OutputStream os = fs.create(dst);
+ try {
+ IOUtils.copyBytes(url.openStream(), os, 4096, true);
+ } finally {
+ // IOUtils can not close both the input and the output properly in a finally
+ // as we can get an exception in between opening the stream and calling the method
+ os.close();
+ }
+ return dst;
+ }
+
+
private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
private PigContext pigContext = null;
@@ -1506,7 +1714,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType().isLocal()) return;
+ if (isLocal(pigContext, conf)) return;
// set up distributed cache for the replicated files
FileSpec[] replFiles = join.getReplFiles();
@@ -1524,8 +1732,8 @@ public class JobControlCompiler{
symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
+ Integer.toString(System.identityHashCode(
replFiles[i].getFileName()))
- + "_" + Long.toString(System.currentTimeMillis())
- + "_" + i;
+ + "_" + Long.toString(System.currentTimeMillis())
+ + "_" + i;
replicatedPath.add(replFiles[i].getFileName() + "#"
+ symlink);
@@ -1562,7 +1770,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType().isLocal()) return;
+ if (isLocal(pigContext, conf)) return;
String indexFile = join.getIndexFile();
@@ -1586,7 +1794,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType().isLocal()) return;
+ if (isLocal(pigContext, conf)) return;
String indexFile = mergeCoGrp.getIndexFileName();
@@ -1623,7 +1831,7 @@ public class JobControlCompiler{
// XXX Hadoop currently doesn't support distributed cache in local mode.
// This line will be removed after the support is added
- if (pigContext.getExecType().isLocal()) return;
+ if (isLocal(pigContext, conf)) return;
// set up distributed cache for files indicated by the UDF
String[] files = func.getCacheFiles();
@@ -1659,7 +1867,7 @@ public class JobControlCompiler{
if (replaced) {
// sample job should have only one ConstantExpression
throw new VisitorException("Invalid reduce plan: more " +
- "than one ConstantExpression found in sampling job");
+ "than one ConstantExpression found in sampling job");
}
cnst.setValue(rp);
cnst.setRequestedParallelism(rp);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Feb 24 21:41:38 2014
@@ -42,6 +42,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.backend.executionengine.ExecException;
@@ -1721,12 +1722,12 @@ public class MRCompiler extends PhyPlanV
String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
throw new MRCompilerException(msg, errCode, PigException.BUG, e);
}
- catch (IOException e){
+ catch (IOException e){
int errCode = 3000;
String errMsg = "IOException caught while compiling POMergeJoin";
throw new MRCompilerException(errMsg, errCode,e);
}
- catch(CloneNotSupportedException e){
+ catch(CloneNotSupportedException e){
int errCode = 2127;
String errMsg = "Cloning exception caught while compiling POMergeJoin";
throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
@@ -2347,13 +2348,14 @@ public class MRCompiler extends PhyPlanV
FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
- String[] rslargs = new String[2];
+ String[] rslargs = new String[2];
// SampleLoader expects string version of FuncSpec
// as its first constructor argument.
rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
-
- rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
+ // This value is only used by order by. For skewed join, it's calculated
+ // based on the file size.
+ rslargs[1] = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100");
FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
new FuncSpec(sampleLdrClassName, rslargs));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Feb 24 21:41:38 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.TransformerException;
@@ -49,6 +50,7 @@ import org.apache.pig.backend.BackendExc
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
@@ -89,7 +91,7 @@ public class MapReduceLauncher extends L
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+ "mapreduce.fileoutputcommitter.marksuccessfuljobs";
private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
@@ -141,32 +143,36 @@ public class MapReduceLauncher extends L
return failureMap.get(spec);
}
- @SuppressWarnings("deprecation")
@Override
public PigStats launchPig(PhysicalPlan php,
- String grpName,
- PigContext pc) throws PlanException,
- VisitorException,
- IOException,
- ExecException,
- JobCreationException,
- Exception {
+ String grpName,
+ PigContext pc) throws PlanException,
+ VisitorException,
+ IOException,
+ ExecException,
+ JobCreationException,
+ Exception {
long sleepTime = 500;
- aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+ aggregateWarning = Boolean.valueOf(pc.getProperties().getProperty("aggregate.warning"));
MROperPlan mrp = compile(php, pc);
ConfigurationValidator.validatePigProperties(pc.getProperties());
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
MRExecutionEngine exe = (MRExecutionEngine) pc.getExecutionEngine();
- JobClient jobClient = new JobClient(exe.getJobConf());
+ Properties defaultProperties = new Properties();
+ JobConf defaultJobConf = exe.getLocalConf(defaultProperties);
+ Utils.recomputeProperties(defaultJobConf, defaultProperties);
+
+ // This is a generic JobClient for checking progress of the jobs
+ JobClient statsJobClient = new JobClient(exe.getJobConf());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+ JobControlCompiler jcc = new JobControlCompiler(pc, conf, ConfigurationUtil.toConfiguration(defaultProperties));
MRScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
// start collecting statistics
- MRPigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
+ MRPigStatsUtil.startCollection(pc, statsJobClient, jcc, mrp);
// Find all the intermediate data stores. The plan will be destroyed during compile/execution
// so this needs to be done before.
@@ -187,7 +193,7 @@ public class MapReduceLauncher extends L
JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
boolean stop_on_failure =
- pc.getProperties().getProperty("stop.on.failure", "false").equals("true");
+ Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
// jc is null only when mrp.size == 0
while(mrp.size() != 0) {
@@ -210,14 +216,14 @@ public class MapReduceLauncher extends L
failedNativeMR.add(natOp);
String msg = "Error running native mapreduce" +
- " operator job :" + natOp.getJobId() + e.getMessage();
+ " operator job :" + natOp.getJobId() + e.getMessage();
String stackTrace = Utils.getStackStraceStr(e);
LogUtils.writeLog(msg,
stackTrace,
pc.getProperties().getProperty("pig.logfile"),
log
- );
+ );
log.info(msg);
if (stop_on_failure) {
@@ -250,10 +256,10 @@ public class MapReduceLauncher extends L
JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
try {
String port = jobConf.get("mapred.job.tracker.http.address");
- String jobTrackerAdd = jobConf.get(MRExecutionEngine.JOB_TRACKER_LOCATION);
+ String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"))
- + port.substring(port.indexOf(":"));
+ + port.substring(port.indexOf(":"));
}
catch(Exception e){
// Could not get the job tracker location, most probably we are running in local mode.
@@ -336,7 +342,7 @@ public class MapReduceLauncher extends L
}
jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
- double prog = (numMRJobsCompl+calculateProgress(jc, jobClient))/totalMRJobs;
+ double prog = (numMRJobsCompl+calculateProgress(jc, statsJobClient))/totalMRJobs;
if (notifyProgress(prog, lastProg)) {
lastProg = prog;
}
@@ -363,8 +369,8 @@ public class MapReduceLauncher extends L
if (jobControlExceptionStackTrace != null) {
LogUtils.writeLog("Error message from job controller",
jobControlExceptionStackTrace, pc
- .getProperties().getProperty(
- "pig.logfile"), log);
+ .getProperties().getProperty(
+ "pig.logfile"), log);
}
throw jobControlException;
} else {
@@ -418,10 +424,13 @@ public class MapReduceLauncher extends L
failed = true;
}
- if (!"false".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.PIG_DELETE_TEMP_FILE))) {
+ if (Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_DELETE_TEMP_FILE, "true"))) {
// Clean up all the intermediate data
for (String path : intermediateVisitor.getIntermediate()) {
- FileLocalizer.delete(path, pc);
+ // Skip non-file system paths such as hbase, see PIG-3617
+ if (Utils.hasFileSystemImpl(new Path(path), conf)) {
+ FileLocalizer.delete(path, pc);
+ }
}
}
@@ -431,7 +440,7 @@ public class MapReduceLauncher extends L
Exception backendException = null;
for (Job fj : failedJobs) {
try {
- getStats(fj, jobClient, true, pc);
+ getStats(fj, statsJobClient, true, pc);
} catch (Exception e) {
backendException = e;
}
@@ -471,9 +480,9 @@ public class MapReduceLauncher extends L
}
}
- getStats(job, jobClient, false, pc);
+ getStats(job, statsJobClient, false, pc);
if (aggregateWarning) {
- computeWarningAggregate(job, jobClient, warningAggMap);
+ computeWarningAggregate(job, statsJobClient, warningAggMap);
}
}
@@ -496,8 +505,8 @@ public class MapReduceLauncher extends L
int ret = failed ? ((succJobs != null && succJobs.size() > 0)
? ReturnCode.PARTIAL_FAILURE
- : ReturnCode.FAILURE)
- : ReturnCode.SUCCESS;
+ : ReturnCode.FAILURE)
+ : ReturnCode.SUCCESS;
PigStats pigStats = PigStatsUtil.getPigStats(ret);
// run cleanup for all of the stores
@@ -574,7 +583,7 @@ public class MapReduceLauncher extends L
PrintStream ps,
String format,
boolean verbose) throws PlanException, VisitorException,
- IOException {
+ IOException {
log.trace("Entering MapReduceLauncher.explain");
MROperPlan mrp = compile(php, pc);
@@ -636,9 +645,9 @@ public class MapReduceLauncher extends L
// We must ensure that there is only 1 reducer for a limit. Add a single-reducer job.
if (!pc.inIllustrator) {
- LimitAdjuster la = new LimitAdjuster(plan, pc);
- la.visit();
- la.adjust();
+ LimitAdjuster la = new LimitAdjuster(plan, pc);
+ la.visit();
+ la.adjust();
}
// Optimize to use secondary sort key if possible
prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
@@ -653,7 +662,7 @@ public class MapReduceLauncher extends L
// optimize joins
LastInputStreamingOptimizer liso =
- new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
+ new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
liso.visit();
// figure out the type of the key for the map plan
@@ -668,7 +677,7 @@ public class MapReduceLauncher extends L
fRem.visit();
boolean isMultiQuery =
- "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+ Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
if (isMultiQuery) {
// reduces the number of MROpers in the MR plan generated
@@ -690,7 +699,7 @@ public class MapReduceLauncher extends L
checker.visit();
boolean isAccum =
- "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true"));
+ Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
if (isAccum) {
AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
accum.visit();
@@ -700,7 +709,7 @@ public class MapReduceLauncher extends L
private boolean shouldMarkOutputDir(Job job) {
return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
- false);
+ false);
}
private void createSuccessFile(Job job, POStore store) throws IOException {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Feb 24 21:41:38 2014
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon Feb 24 21:41:38 2014
@@ -17,26 +17,31 @@
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+import java.util.Map;
+import java.util.WeakHashMap;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
- *
+ *
* A singleton class that implements the PigLogger interface
* for use in map reduce context. Provides ability to aggregate
* warning messages
*/
public final class PigHadoopLogger implements PigLogger {
- private static PigHadoopLogger instance = new PigHadoopLogger();
-
- public static synchronized PigHadoopLogger getInstance() {
- if (instance == null) {
- instance = new PigHadoopLogger();
- }
- return instance;
- }
+ private static class PigHadoopLoggerHelper {
+ private static PigHadoopLogger instance = new PigHadoopLogger();
+ }
+
+ public static PigHadoopLogger getInstance() {
+ return PigHadoopLoggerHelper.instance;
+ }
private static Log log = LogFactory.getLog(PigHadoopLogger.class);
@@ -44,21 +49,33 @@ public final class PigHadoopLogger imple
private boolean aggregate = false;
+ private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
private PigHadoopLogger() {
- }
+ }
- @SuppressWarnings("unchecked")
+ @SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
- String displayMessage = o.getClass().getName() + ": " + msg;
-
+ String className = o.getClass().getName();
+ String displayMessage = className + "(" + warningEnum + "): " + msg;
+
if (getAggregate()) {
if (reporter != null) {
- reporter.getCounter(warningEnum).increment(1);
+ // log atleast once
+ if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+ log.warn(displayMessage);
+ msgMap.put(o, displayMessage);
+ }
+ if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+ reporter.getCounter(className, warningEnum.name()).increment(1);
+ } else {
+ reporter.getCounter(warningEnum).increment(1);
+ }
} else {
//TODO:
//in local mode of execution if the PigHadoopLogger is used initially,
- //then aggregation cannot be performed as the reporter will be null.
- //The reference to a reporter is given by Hadoop at run time.
+ //then aggregation cannot be performed as the reporter will be null.
+ //The reference to a reporter is given by Hadoop at run time.
//In local mode, due to the absence of Hadoop there will be no reporter
//Just print the warning message as is.
//If a warning message is printed in map reduce mode when aggregation
@@ -68,16 +85,16 @@ public final class PigHadoopLogger imple
} else {
log.warn(displayMessage);
}
- }
+ }
public synchronized void setReporter(PigStatusReporter rep) {
this.reporter = rep;
}
-
+
public synchronized boolean getAggregate() {
return aggregate;
}
-
+
public synchronized void setAggregate(boolean aggregate) {
this.aggregate = aggregate;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon Feb 24 21:41:38 2014
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -56,14 +55,6 @@ public class PigInputFormat extends Inpu
public static final Log log = LogFactory
.getLog(PigInputFormat.class);
- private static final PathFilter hiddenFileFilter = new PathFilter() {
- @Override
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
public static final String PIG_INPUTS = "pig.inputs";
/**
@@ -77,6 +68,7 @@ public class PigInputFormat extends Inpu
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
+ @SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit split,
@@ -127,7 +119,7 @@ public class PigInputFormat extends Inpu
* @throws IOException
*/
static void mergeSplitSpecificConf(LoadFunc loadFunc, PigSplit pigSplit, Configuration originalConf)
- throws IOException {
+ throws IOException {
// set up conf with entries from input specific conf
Job job = new Job(originalConf);
loadFunc.setLocation(getLoadLocation(pigSplit.getInputIndex(),
@@ -147,8 +139,8 @@ public class PigInputFormat extends Inpu
@SuppressWarnings("unchecked")
private static LoadFunc getLoadFunc(int inputIndex, Configuration conf) throws IOException {
ArrayList<FileSpec> inputs =
- (ArrayList<FileSpec>) ObjectSerializer.deserialize(
- conf.get(PIG_INPUTS));
+ (ArrayList<FileSpec>) ObjectSerializer.deserialize(
+ conf.get(PIG_INPUTS));
FuncSpec loadFuncSpec = inputs.get(inputIndex).getFuncSpec();
return (LoadFunc) PigContext.instantiateFuncFromSpec(loadFuncSpec);
}
@@ -156,8 +148,8 @@ public class PigInputFormat extends Inpu
@SuppressWarnings("unchecked")
private static String getLoadLocation(int inputIndex, Configuration conf) throws IOException {
ArrayList<FileSpec> inputs =
- (ArrayList<FileSpec>) ObjectSerializer.deserialize(
- conf.get(PIG_INPUTS));
+ (ArrayList<FileSpec>) ObjectSerializer.deserialize(
+ conf.get(PIG_INPUTS));
return inputs.get(inputIndex).getFileName();
}
@@ -174,8 +166,8 @@ public class PigInputFormat extends Inpu
static void passLoadSignature(LoadFunc loadFunc, int inputIndex,
Configuration conf) throws IOException {
List<String> inpSignatureLists =
- (ArrayList<String>)ObjectSerializer.deserialize(
- conf.get("pig.inpSignatures"));
+ (ArrayList<String>)ObjectSerializer.deserialize(
+ conf.get("pig.inpSignatures"));
// signature can be null for intermediate jobs where it will not
// be required to be passed down
if(inpSignatureLists.get(inputIndex) != null) {
@@ -189,10 +181,10 @@ public class PigInputFormat extends Inpu
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
*/
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public List<InputSplit> getSplits(JobContext jobcontext)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
Configuration conf = jobcontext.getConfiguration();
@@ -250,8 +242,8 @@ public class PigInputFormat extends Inpu
LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
loadFuncSpec);
boolean combinable = !(loadFunc instanceof MergeJoinIndexer
- || loadFunc instanceof IndexableLoadFunc
- || (loadFunc instanceof CollectableLoadFunc && loadFunc instanceof OrderedLoadFunc));
+ || loadFunc instanceof IndexableLoadFunc
+ || (loadFunc instanceof CollectableLoadFunc && loadFunc instanceof OrderedLoadFunc));
if (combinable)
combinable = !conf.getBoolean("pig.noSplitCombination", false);
JobConf confClone = new JobConf(conf);
@@ -280,9 +272,9 @@ public class PigInputFormat extends Inpu
int errCode = 2118;
String msg = "Unable to create input splits for: " + inputs.get(i).getFileName();
if(e.getMessage() !=null && (!e.getMessage().isEmpty()) ){
- throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
+ throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
}else{
- throw new ExecException(msg, errCode, PigException.BUG, e);
+ throw new ExecException(msg, errCode, PigException.BUG, e);
}
}
}
@@ -311,7 +303,7 @@ public class PigInputFormat extends Inpu
protected List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
int inputIndex, ArrayList<OperatorKey> targetOps, long blockSize, boolean combinable, Configuration conf)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ArrayList<InputSplit> pigSplits = new ArrayList<InputSplit>();
if (!combinable) {
int splitIndex = 0;
@@ -328,7 +320,7 @@ public class PigInputFormat extends Inpu
// default is the block size
maxCombinedSplitSize = blockSize;
List<List<InputSplit>> combinedSplits =
- MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
+ MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
for (int i = 0; i < combinedSplits.size(); i++)
pigSplits.add(createPigSplit(combinedSplits.get(i), inputIndex, targetOps, i, conf));
return pigSplits;
@@ -336,7 +328,7 @@ public class PigInputFormat extends Inpu
}
private InputSplit createPigSplit(List<InputSplit> combinedSplits,
- int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
+ int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
{
PigSplit pigSplit = new PigSplit(combinedSplits.toArray(new InputSplit[0]), inputIndex, targetOps, splitIndex);
pigSplit.setConf(conf);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Feb 24 21:41:38 2014
@@ -22,18 +22,22 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -204,7 +208,21 @@ public class PigOutputFormat extends Out
// The above call should have update the conf in the JobContext
// to have the output location - now call checkOutputSpecs()
- of.checkOutputSpecs(jobContextCopy);
+ try {
+ of.checkOutputSpecs(jobContextCopy);
+ } catch (IOException ioe) {
+ boolean shouldThrowException = true;
+ if (sFunc instanceof OverwritableStoreFunc) {
+ if (((OverwritableStoreFunc) sFunc).shouldOverwrite()) {
+ if (ioe instanceof FileAlreadyExistsException
+ || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+ shouldThrowException = false;
+ }
+ }
+ }
+ if (shouldThrowException)
+ throw ioe;
+ }
}
}
/**
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 24 21:41:38 2014
@@ -125,11 +125,7 @@ public class POUserFunc extends Expressi
private void instantiateFunc(FuncSpec fSpec) {
this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
this.setSignature(signature);
- Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
- Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
-
- if(tmpS!=null)
- this.func.setInputSchema(tmpS);
+ this.setFuncInputSchema(signature);
if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
executor = new MonitoredUDFExecutor(func);
}
@@ -609,4 +605,17 @@ public class POUserFunc extends Expressi
this.func.setUDFContextSignature(signature);
}
}
+
+ /**
+ * Sets EvalFunc's inputschema based on the signature
+ * @param signature
+ */
+ public void setFuncInputSchema(String signature) {
+ Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+ Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+ if(tmpS!=null) {
+ this.func.setInputSchema(tmpS);
+ }
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Mon Feb 24 21:41:38 2014
@@ -369,7 +369,7 @@ public class POPartialAgg extends Physic
Result res = getOutput(entry.getKey(), valueTuple);
iter.remove();
addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
- numEntriesInTarget += valueTuple.size() - 1;
+ numEntriesInTarget++;
}
return numEntriesInTarget;
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Feb 24 21:41:38 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
@@ -32,9 +31,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -62,6 +59,14 @@ public class POStream extends PhysicalOp
protected boolean allOutputFromBinaryProcessed = false;
+ /**
+ * This flag indicates whether streaming is done through fetching. If set,
+ * {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to
+ * skip the case in {@link #getNextTuple()} which is called by map() or reduce() when
+ * processing the next tuple.
+ */
+ private boolean isFetchable;
+
public POStream(OperatorKey k, ExecutableManager executableManager,
StreamingCommand command, Properties properties) {
super(k);
@@ -170,7 +175,7 @@ public class POStream extends PhysicalOp
// if we are here, we haven't consumed all input to be sent
// to the streaming binary - check if we are being called
// from close() on the map or reduce
- if(this.parentPlan.endOfAllInput) {
+ if(isFetchable || this.parentPlan.endOfAllInput) {
Result r = getNextHelper((Tuple)null);
if(r.returnStatus == POStatus.STATUS_EOP) {
// we have now seen *ALL* possible input
@@ -373,4 +378,19 @@ public class POStream extends PhysicalOp
}
return (Tuple) out;
}
+
+ /**
+ * @return true if streaming is done through fetching
+ */
+ public boolean isFetchable() {
+ return isFetchable;
+ }
+
+ /**
+ * @param isFetchable - whether fetching is applied on POStream
+ */
+ public void setFetchable(boolean isFetchable) {
+ this.isFetchable = isFetchable;
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 24 21:41:38 2014
@@ -42,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.BinSedesTuple;
import org.apache.pig.data.SchemaTupleBackend;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.ReadScalarsTez;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.tez.common.TezUtils;
Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1571454&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Mon Feb 24 21:41:38 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+ private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
+ private String inputKey;
+ private transient Tuple t;
+ private transient LogicalInput input;
+
+ public ReadScalarsTez(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ String cacheKey = "scalar-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ inputsToSkip.add(inputKey);
+ }
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ String cacheKey = "scalar-" + inputKey;
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ t = (Tuple) cacheValue;
+ return;
+ }
+ input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ if (reader.next()) {
+ t = (Tuple) reader.getCurrentValue();
+ if (reader.next()) {
+ String msg = "Scalar has more than one row in the output. "
+ + "1st : " + t + ", 2nd :"
+ + reader.getCurrentValue();
+ throw new ExecException(msg);
+ }
+ } else {
+ LOG.info("Scalar input from vertex " + inputKey + " is null");
+ }
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ ObjectCache.getInstance().cache(cacheKey, t);
+ log.info("Cached scalar in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+ }
+
+ @Override
+ public Object exec(Tuple input) throws IOException {
+ int pos = (Integer) input.get(0);
+ Object obj = t.get(pos);
+ return obj;
+ }
+}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 24 21:41:38 2014
@@ -85,7 +85,6 @@ import org.apache.pig.impl.builtin.Defau
import org.apache.pig.impl.builtin.FindQuantiles;
import org.apache.pig.impl.builtin.GetMemNumRows;
import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.builtin.ReadScalarsTez;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -236,22 +235,22 @@ public class TezCompiler extends PhyPlan
}
tezOper.setClosed(true);
}
-
+
fixScalar();
return tezPlan;
}
-
+
private void fixScalar() throws VisitorException, PlanException {
// Mapping POStore to POValueOuptut
Map<POStore, POValueOutputTez> storeSeen = new HashMap<POStore, POValueOutputTez>();
-
+
for (TezOperator tezOp : tezPlan) {
List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
for (POUserFunc userFunc : userFuncs) {
if (userFunc.getReferencedOperator()!=null) { // Scalar
POStore store = (POStore)userFunc.getReferencedOperator();
-
+
TezOperator from = phyToTezOpMap.get(store);
FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
@@ -323,7 +322,7 @@ public class TezCompiler extends PhyPlan
POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
storeTezOper.plan.addAsLeaf(valueOutput);
storeTezOper.setSplitter(true);
-
+
// Create a splittee of store only
TezOperator storeOnlyTezOperator = getTezOp();
PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan();
@@ -334,7 +333,7 @@ public class TezCompiler extends PhyPlan
storeOnlyTezOperator.plan = storeOnlyPhyPlan;
tezPlan.add(storeOnlyTezOperator);
phyToTezOpMap.put(store, storeOnlyTezOperator);
-
+
// Create new operator as second splittee
curTezOp = getTezOp();
POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -348,13 +347,13 @@ public class TezCompiler extends PhyPlan
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
-
+
edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
edge.dataMovementType = DataMovementType.ONE_TO_ONE;
edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
curTezOp.setRequestedParallelismByReference(storeTezOper);
-
+
return;
}
@@ -1723,7 +1722,7 @@ public class TezCompiler extends PhyPlan
oper2.setGlobalSort(true);
opers[1] = oper2;
tezPlan.add(oper2);
-
+
long limit = sort.getLimit();
//TODO: TezOperator limit not used at all
oper2.limit = limit;
@@ -1738,7 +1737,7 @@ public class TezCompiler extends PhyPlan
}
oper2.setSortOrder(sortOrder);
}
-
+
identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
if (limit!=-1) {
@@ -1982,7 +1981,7 @@ public class TezCompiler extends PhyPlan
curTezOp.plan.add(pkg);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
phyToTezOpMap.put(op, curTezOp);
- // TODO: Use alias vertex that is introduced by TEZ-678
+ // TODO: Use alias vertex that is introduced by TEZ-678
} catch (Exception e) {
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Feb 24 21:41:38 2014
@@ -201,7 +201,7 @@ public class TezDagBuilder extends TezOp
break;
}
}
-
+
List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
POValueOutputTez.class);
if (!valueOutputs.isEmpty()) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Feb 24 21:41:38 2014
@@ -168,7 +168,7 @@ public class TezLauncher extends Launche
SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
skOptimizer.visit();
}
-
+
boolean isMultiQuery =
"true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Feb 24 21:41:38 2014
@@ -41,6 +41,7 @@ import org.apache.pig.FuncSpec;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -175,6 +176,48 @@ public class MapRedUtil {
}
}
+ /**
+ * Sets up output and log dir paths for a single-store streaming job
+ *
+ * @param st - POStore of the current job
+ * @param pigContext
+ * @param conf
+ * @throws IOException
+ */
+ public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext,
+ Configuration conf) throws IOException {
+ // set out filespecs
+ String outputPathString = st.getSFile().getFileName();
+ if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+ conf.set("pig.streaming.log.dir",
+ new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+ }
+ else {
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+ Path tmpLocation = new Path(tmpLocationStr);
+ conf.set("pig.streaming.log.dir",
+ new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+ }
+ conf.set("pig.streaming.task.output.dir", outputPathString);
+ }
+
+ /**
+ * Sets up output and log dir paths for a multi-store streaming job
+ *
+ * @param pigContext
+ * @param conf
+ * @throws IOException
+ */
+ public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf)
+ throws IOException {
+
+ String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+ Path tmpLocation = new Path(tmpLocationStr);
+ conf.set("pig.streaming.log.dir",
+ new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+ conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+ }
+
public static FileSpec checkLeafIsStore(
PhysicalPlan plan,
PigContext pigContext) throws ExecException {