You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC

svn commit: r1571454 [3/5] - in /pig/branches/tez: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/ contrib/piggybank/java/src/main/java/...

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Feb 24 21:41:38 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
@@ -31,10 +33,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +47,7 @@ import org.apache.hadoop.filecache.Distr
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.Counters;
@@ -54,6 +60,7 @@ import org.apache.hadoop.mapred.jobcontr
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
@@ -84,6 +91,7 @@ import org.apache.pig.data.SchemaTupleFr
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableBigDecimalWritable;
@@ -107,6 +115,7 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRScriptState;
 
 /**
@@ -133,9 +142,11 @@ import org.apache.pig.tools.pigstats.map
  * These are all just type specific instances of WritableComparator.
  *
  */
+@SuppressWarnings("deprecation")
 public class JobControlCompiler{
     MROperPlan plan;
     Configuration conf;
+    Configuration defaultConf;
     PigContext pigContext;
 
     private static final Matcher DISTRIBUTED_CACHE_ARCHIVE_MATCHER = Pattern
@@ -155,6 +166,9 @@ public class JobControlCompiler{
     public static final String PIG_MAP_SEPARATOR = "_";
     public HashMap<String, ArrayList<Pair<String,Long>>> globalCounters = new HashMap<String, ArrayList<Pair<String,Long>>>();
 
+    public static final String SMALL_JOB_LOG_MSG = "This job was detected as a small job, will run in-process instead";
+    public static final String BIG_JOB_LOG_MSG = "This job cannot be converted run in-process";
+
     /**
      * We will serialize the POStore(s) present in map and reduce in lists in
      * the Hadoop Conf. In the case of Multi stores, we could deduce these from
@@ -173,8 +187,13 @@ public class JobControlCompiler{
     private int counterSize;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) {
+        this(pigContext, conf, null);
+    }
+
+    public JobControlCompiler(PigContext pigContext, Configuration conf, Configuration defaultConf) {
         this.pigContext = pigContext;
         this.conf = conf;
+        this.defaultConf = defaultConf;
         jobStoreMap = new HashMap<Job, Pair<List<POStore>, Path>>();
         jobMroMap = new HashMap<Job, MapReduceOper>();
     }
@@ -278,15 +297,15 @@ public class JobControlCompiler{
         String defaultPigJobControlSleep = pigContext.getExecType().isLocal() ? "100" : "5000";
         String pigJobControlSleep = conf.get("pig.jobcontrol.sleep", defaultPigJobControlSleep);
         if (!pigJobControlSleep.equals(defaultPigJobControlSleep)) {
-          log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
+            log.info("overriding default JobControl sleep (" + defaultPigJobControlSleep + ") to " + pigJobControlSleep);
         }
 
         try {
-          timeToSleep = Integer.parseInt(pigJobControlSleep);
+            timeToSleep = Integer.parseInt(pigJobControlSleep);
         } catch (NumberFormatException e) {
-          throw new RuntimeException("Invalid configuration " +
-              "pig.jobcontrol.sleep=" + pigJobControlSleep +
-              " should be a time in ms. default=" + defaultPigJobControlSleep, e);
+            throw new RuntimeException("Invalid configuration " +
+                    "pig.jobcontrol.sleep=" + pigJobControlSleep +
+                    " should be a time in ms. default=" + defaultPigJobControlSleep, e);
         }
 
         JobControl jobCtrl = HadoopShims.newJobControl(grpName, timeToSleep);
@@ -362,7 +381,24 @@ public class JobControlCompiler{
 
         try {
             counters = HadoopShims.getCounters(job);
-            groupCounters = counters.getGroup(getGroupName(counters.getGroupNames()));
+
+            String groupName = getGroupName(counters.getGroupNames());
+            // In case that the counter group was not find, we need to find
+            // out why. Only acceptable state is that the relation has been
+            // empty.
+            if (groupName == null) {
+                Counter outputRecords =
+                    counters.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP)
+                    .getCounterForName(MRPigStatsUtil.MAP_OUTPUT_RECORDS);
+
+                if(outputRecords.getCounter() == 0) {
+                    globalCounters.put(operationID, new ArrayList<Pair<String, Long>>());
+                    return;
+                } else {
+                  throw new RuntimeException("Did not found RANK counter group for operationId: " + operationID);
+                }
+            }
+            groupCounters = counters.getGroup(groupName);
 
             Iterator<Counter> it = groupCounters.iterator();
             HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
@@ -399,6 +435,29 @@ public class JobControlCompiler{
         }
         return null;
     }
+
+    private boolean okToRunLocal(org.apache.hadoop.mapreduce.Job job, MapReduceOper mro, List<POLoad> lds) throws IOException {
+        Configuration conf = job.getConfiguration();
+        if(!conf.getBoolean(PigConfiguration.PIG_AUTO_LOCAL_ENABLED, false)) {
+            return false;
+        }
+
+        long totalInputFileSize = InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job);
+        long inputByteMax = conf.getLong(PigConfiguration.PIG_AUTO_LOCAL_INPUT_MAXBYTES, 100*1000*1000l);
+        log.info("Size of input: " + totalInputFileSize +" bytes. Small job threshold: " + inputByteMax );
+        if (totalInputFileSize < 0 || totalInputFileSize > inputByteMax) {
+            return false;
+        }
+
+        int reducers = conf.getInt("mapred.reduce.tasks", 1);
+        log.info("No of reducers: " + reducers);
+        if (reducers > 1) {
+            return false;
+        }
+
+        return true;
+    }
+
     /**
      * The method that creates the Job corresponding to a MapReduceOper.
      * The assumption is that
@@ -421,7 +480,7 @@ public class JobControlCompiler{
      * @return Job corresponding to mro
      * @throws JobCreationException
      */
-    @SuppressWarnings({ "unchecked", "deprecation" })
+    @SuppressWarnings({ "unchecked" })
     private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, PigContext pigContext) throws JobCreationException{
         org.apache.hadoop.mapreduce.Job nwJob = null;
 
@@ -470,7 +529,6 @@ public class JobControlCompiler{
         }
 
         try{
-
             //Process the POLoads
             List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
 
@@ -487,10 +545,12 @@ public class JobControlCompiler{
             if(!mro.reducePlan.isEmpty()){
                 log.info("Reduce phase detected, estimating # of required reducers.");
                 adjustNumReducers(plan, mro, nwJob);
+            } else {
+                nwJob.setNumReduceTasks(0);
             }
 
             if(lds!=null && lds.size()>0){
-              for (POLoad ld : lds) {
+                for (POLoad ld : lds) {
                     //Store the target operators for tuples read
                     //from this input
                     List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
@@ -511,28 +571,50 @@ public class JobControlCompiler{
 
             if (!pigContext.inIllustrator && ! pigContext.getExecType().isLocal())
             {
+                if (okToRunLocal(nwJob, mro, lds)) {
+                    log.info(SMALL_JOB_LOG_MSG);
+                    // override with the default conf to run in local mode
+                    for (Entry<String, String> entry : defaultConf) {
+                        String key = entry.getKey();
+                        if (key.equals("mapred.reduce.tasks")) {
+                            // this must not be set back to the default in case it has been set to 0 for example.
+                            continue;
+                        }
+                        if (key.startsWith("fs.")) {
+                            // we don't want to change fs settings back
+                            continue;
+                        }
+                        String value = entry.getValue();
+                        if (conf.get(key) == null || !conf.get(key).equals(value)) {
+                            conf.set(key, value);
+                        }
+                    }
 
-                // Setup the DistributedCache for this job
-                for (URL extraJar : pigContext.extraJars) {
-                    log.debug("Adding jar to DistributedCache: " + extraJar.toString());
-                    Utils.putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
-                }
-                
-                for (String scriptJar : pigContext.scriptJars) {
-                    log.debug("Adding jar to DistributedCache: " + scriptJar.toString());
-                    Utils.putJarOnClassPathThroughDistributedCache(pigContext, conf, new File(scriptJar).toURI().toURL());
-                }
+                    conf.setBoolean(PigImplConstants.CONVERTED_TO_LOCAL, true);
+                } else {
+                    log.info(BIG_JOB_LOG_MSG);
+                    // Setup the DistributedCache for this job
+                    for (URL extraJar : pigContext.extraJars) {
+                        log.debug("Adding jar to DistributedCache: " + extraJar.toString());
+                        putJarOnClassPathThroughDistributedCache(pigContext, conf, extraJar);
+                    }
 
-                //Create the jar of all functions and classes required
-                File submitJarFile = File.createTempFile("Job", ".jar");
-                log.info("creating jar file "+submitJarFile.getName());
-                // ensure the job jar is deleted on exit
-                submitJarFile.deleteOnExit();
-                FileOutputStream fos = new FileOutputStream(submitJarFile);
-                JarManager.createJar(fos, mro.UDFs, pigContext);
-                log.info("jar file "+submitJarFile.getName()+" created");
-                //Start setting the JobConf properties
-                conf.set("mapred.jar", submitJarFile.getPath());
+                    for (String scriptJar : pigContext.scriptJars) {
+                        log.debug("Adding jar to DistributedCache: " + scriptJar.toString());
+                        putJarOnClassPathThroughDistributedCache(pigContext, conf, new File(scriptJar).toURI().toURL());
+                    }
+
+                    //Create the jar of all functions and classes required
+                    File submitJarFile = File.createTempFile("Job", ".jar");
+                    log.info("creating jar file "+submitJarFile.getName());
+                    // ensure the job jar is deleted on exit
+                    submitJarFile.deleteOnExit();
+                    FileOutputStream fos = new FileOutputStream(submitJarFile);
+                    JarManager.createJar(fos, mro.UDFs, pigContext);
+                    log.info("jar file "+submitJarFile.getName()+" created");
+                    //Start setting the JobConf properties
+                    conf.set("mapred.jar", submitJarFile.getPath());
+                }
             }
             conf.set("pig.inputs", ObjectSerializer.serialize(inp));
             conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
@@ -553,46 +635,80 @@ public class JobControlCompiler{
                 // and set the hadoop job priority.
                 String jobPriority = pigContext.getProperties().getProperty(PigContext.JOB_PRIORITY).toUpperCase();
                 try {
-                  // Allow arbitrary case; the Hadoop job priorities are all upper case.
-                  conf.set("mapred.job.priority", JobPriority.valueOf(jobPriority).toString());
+                    // Allow arbitrary case; the Hadoop job priorities are all upper case.
+                    conf.set("mapred.job.priority", JobPriority.valueOf(jobPriority).toString());
 
                 } catch (IllegalArgumentException e) {
-                  StringBuffer sb = new StringBuffer("The job priority must be one of [");
-                  JobPriority[] priorities = JobPriority.values();
-                  for (int i = 0; i < priorities.length; ++i) {
-                    if (i > 0)  sb.append(", ");
-                    sb.append(priorities[i]);
-                  }
-                  sb.append("].  You specified [" + jobPriority + "]");
-                  throw new JobCreationException(sb.toString());
+                    StringBuffer sb = new StringBuffer("The job priority must be one of [");
+                    JobPriority[] priorities = JobPriority.values();
+                    for (int i = 0; i < priorities.length; ++i) {
+                        if (i > 0)  sb.append(", ");
+                        sb.append(priorities[i]);
+                    }
+                    sb.append("].  You specified [" + jobPriority + "]");
+                    throw new JobCreationException(sb.toString());
                 }
             }
 
-            setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
-                                  "pig.streaming.ship.files", true);
-            setupDistributedCache(pigContext, nwJob.getConfiguration(), pigContext.getProperties(),
-                                  "pig.streaming.cache.files", false);
+            setupDistributedCache(pigContext, conf, pigContext.getProperties(),
+                    "pig.streaming.ship.files", true);
+            setupDistributedCache(pigContext, conf, pigContext.getProperties(),
+                    "pig.streaming.cache.files", false);
 
             nwJob.setInputFormatClass(PigInputFormat.class);
 
+            // tmp file compression setups
+            // PIG-3741 This must be done before setStoreLocation on POStores
+            Utils.setTmpFileCompressionOnConf(pigContext, conf);
+
             //Process POStore and remove it from the plan
             LinkedList<POStore> mapStores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
             LinkedList<POStore> reduceStores = PlanHelper.getPhysicalOperators(mro.reducePlan, POStore.class);
 
-            for (POStore st: mapStores) {
+            for (POStore st : mapStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (sFunc instanceof OverwritableStoreFunc) {
+                    OverwritableStoreFunc osf = (OverwritableStoreFunc) sFunc;
+                    if (osf.shouldOverwrite()) {
+                        osf.cleanupOutput(st, nwJob);
+                    }
+                }
             }
 
-            for (POStore st: reduceStores) {
+            for (POStore st : reduceStores) {
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
                 sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+                if (sFunc instanceof OverwritableStoreFunc) {
+                    OverwritableStoreFunc osf = (OverwritableStoreFunc) sFunc;
+                    if (osf.shouldOverwrite()) {
+                        osf.cleanupOutput(st, nwJob);
+                    }
+                }
             }
 
-            // the OutputFormat we report to Hadoop is always PigOutputFormat
-            nwJob.setOutputFormatClass(PigOutputFormat.class);
+            // the OutputFormat we report to Hadoop is always PigOutputFormat which
+            // can be wrapped with LazyOutputFormat provided if it is supported by
+            // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set 
+            if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+                try {
+                    Class<?> clazz = PigContext.resolveClassName(
+                            "org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
+                    Method method = clazz.getMethod("setOutputFormatClass", nwJob.getClass(),
+                            Class.class);
+                    method.invoke(null, nwJob, PigOutputFormat.class);
+                }
+                catch (Exception e) {
+                    nwJob.setOutputFormatClass(PigOutputFormat.class);
+                    log.warn(PigConfiguration.PIG_OUTPUT_LAZY
+                            + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
+                }
+            }
+            else {
+                nwJob.setOutputFormatClass(PigOutputFormat.class);
+            }
 
             if (mapStores.size() + reduceStores.size() == 1) { // single store case
                 log.info("Setting up single store job");
@@ -608,28 +724,12 @@ public class JobControlCompiler{
                     if(!pigContext.inIllustrator)
                         mro.reducePlan.remove(st);
                 }
-
-                // set out filespecs
-                String outputPathString = st.getSFile().getFileName();
-                if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
-                    conf.set("pig.streaming.log.dir",
-                            new Path(outputPathString, LOG_DIR).toString());
-                } else {
-                    String tmpLocationStr =  FileLocalizer
-                    .getTemporaryPath(pigContext).toString();
-                    tmpLocation = new Path(tmpLocationStr);
-                    conf.set("pig.streaming.log.dir",
-                            new Path(tmpLocation, LOG_DIR).toString());
-                }
-                conf.set("pig.streaming.task.output.dir", outputPathString);
+                
+                MapRedUtil.setupStreamingDirsConfSingle(st, pigContext, conf);
             }
-           else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
+            else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
                 log.info("Setting up multi store job");
-                String tmpLocationStr =  FileLocalizer
-                .getTemporaryPath(pigContext).toString();
-                tmpLocation = new Path(tmpLocationStr);
-
-                nwJob.setOutputFormatClass(PigOutputFormat.class);
+                MapRedUtil.setupStreamingDirsConfMulti(pigContext, conf);
 
                 boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
                 if (disableCounter) {
@@ -641,11 +741,7 @@ public class JobControlCompiler{
                     sto.setMultiStore(true);
                     sto.setIndex(idx++);
                 }
-
-                conf.set("pig.streaming.log.dir",
-                            new Path(tmpLocation, LOG_DIR).toString());
-                conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
-           }
+            }
 
             // store map key type
             // this is needed when the key is null to create
@@ -672,7 +768,6 @@ public class JobControlCompiler{
             if(mro.reducePlan.isEmpty()){
                 //MapOnly Job
                 nwJob.setMapperClass(PigMapOnly.Map.class);
-                nwJob.setNumReduceTasks(0);
                 if(!pigContext.inIllustrator)
                     conf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
                 if(mro.isEndOfAllInputSetInMap()) {
@@ -775,7 +870,7 @@ public class JobControlCompiler{
 
                 } else {
                     conf.set("pig.sortOrder",
-                        ObjectSerializer.serialize(mro.getSortOrder()));
+                            ObjectSerializer.serialize(mro.getSortOrder()));
                 }
             }
 
@@ -820,9 +915,6 @@ public class JobControlCompiler{
                 conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
             }
 
-            // tmp file compression setups
-            Utils.setTmpFileCompressionOnConf(pigContext, conf);
-
             String tmp;
             long maxCombinedSplitSize = 0;
             if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
@@ -844,12 +936,12 @@ public class JobControlCompiler{
                 if (newfiles!=null) {
                     String files = conf.get("mapreduce.job.cache.files");
                     conf.set("mapreduce.job.cache.files",
-                        files == null ? newfiles.toString() : files + "," + newfiles);
+                            files == null ? newfiles.toString() : files + "," + newfiles);
                 }
             }
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);
-            Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList<Job>());
+            Job cjob = new Job(new JobConf(conf), new ArrayList<Job>());
             jobStoreMap.put(cjob,new Pair<List<POStore>, Path>(storeLocations, tmpLocation));
             return cjob;
 
@@ -886,7 +978,7 @@ public class JobControlCompiler{
 
             // set the runtime #reducer of the next job as the #partition
             ParallelConstantVisitor visitor =
-              new ParallelConstantVisitor(mro.reducePlan, nPartitions);
+                    new ParallelConstantVisitor(mro.reducePlan, nPartitions);
             visitor.visit();
         }
         log.info("Setting Parallelism to " + jobParallelism);
@@ -931,7 +1023,7 @@ public class JobControlCompiler{
             } else {
                 // reducer estimation could return -1 if it couldn't estimate
                 log.info("Could not estimate number of reducers and no requested or default " +
-                         "parallelism set. Defaulting to 1 reducer.");
+                        "parallelism set. Defaulting to 1 reducer.");
                 jobParallelism = 1;
             }
         }
@@ -949,17 +1041,17 @@ public class JobControlCompiler{
      * @throws IOException
      */
     public static int estimateNumberOfReducers(org.apache.hadoop.mapreduce.Job job,
-                                               MapReduceOper mapReducerOper) throws IOException {
+            MapReduceOper mapReducerOper) throws IOException {
         Configuration conf = job.getConfiguration();
 
         PigReducerEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ?
-          new InputSizeReducerEstimator() :
-          PigContext.instantiateObjectFromParams(conf,
-                  REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
-
-        log.info("Using reducer estimator: " + estimator.getClass().getName());
-        int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
-        return numberOfReducers;
+                new InputSizeReducerEstimator() :
+                    PigContext.instantiateObjectFromParams(conf,
+                            REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, PigReducerEstimator.class);
+
+                log.info("Using reducer estimator: " + estimator.getClass().getName());
+                int numberOfReducers = estimator.estimateNumberOfReducers(job, mapReducerOper);
+                return numberOfReducers;
     }
 
     public static class PigSecondaryKeyGroupComparator extends WritableComparator {
@@ -1344,31 +1436,31 @@ public class JobControlCompiler{
             PigContext pigContext, Configuration conf) throws IOException {
 
         new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
-                .visit();
+        .visit();
 
         new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
-                .visit();
+        .visit();
     }
 
     private void setupDistributedCacheForUdfs(MapReduceOper mro,
-                                              PigContext pigContext,
-                                              Configuration conf) throws IOException {
+            PigContext pigContext,
+            Configuration conf) throws IOException {
         new UdfDistributedCacheVisitor(mro.mapPlan, pigContext, conf).visit();
         new UdfDistributedCacheVisitor(mro.reducePlan, pigContext, conf).visit();
     }
 
     private static void setupDistributedCache(PigContext pigContext,
-                                              Configuration conf,
-                                              Properties properties, String key,
-                                              boolean shipToCluster)
-    throws IOException {
+            Configuration conf,
+            Properties properties, String key,
+            boolean shipToCluster)
+                    throws IOException {
         // Set up the DistributedCache for this job
         String fileNames = properties.getProperty(key);
 
-        if (fileNames != null) {
-            String[] paths = fileNames.split(",");
-            setupDistributedCache(pigContext, conf, paths, shipToCluster);
-        }
+                if (fileNames != null) {
+                    String[] paths = fileNames.split(",");
+                    setupDistributedCache(pigContext, conf, paths, shipToCluster);
+                }
     }
 
     private static void addToDistributedCache(URI uri, Configuration conf) {
@@ -1396,7 +1488,7 @@ public class JobControlCompiler{
                 // DistributedCache
                 if (shipToCluster) {
                     Path dst =
-                        new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
+                            new Path(FileLocalizer.getTemporaryPath(pigContext).toString());
                     FileSystem fs = dst.getFileSystem(conf);
                     fs.copyFromLocalFile(src, dst);
 
@@ -1419,7 +1511,7 @@ public class JobControlCompiler{
                             break;
                         }
                         String msg = "Invalid ship specification. " +
-                        "File doesn't exist: " + dst;
+                                "File doesn't exist: " + dst;
                         throw new ExecException(msg, errCode, errSrc);
                     }
                     addToDistributedCache(dstURI, conf);
@@ -1430,6 +1522,10 @@ public class JobControlCompiler{
         }
     }
 
+    private static boolean isLocal(PigContext pigContext, Configuration conf) {
+        return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+    }
+
     private static String addSingleFileToDistributedCache(
             PigContext pigContext, Configuration conf, String filename,
             String prefix) throws IOException {
@@ -1437,14 +1533,14 @@ public class JobControlCompiler{
         if (!pigContext.inIllustrator && !FileLocalizer.fileExists(filename, pigContext)) {
             throw new IOException(
                     "Internal error: skew join partition file "
-                    + filename + " does not exist");
+                            + filename + " does not exist");
         }
 
         String symlink = filename;
 
         // XXX Hadoop currently doesn't support distributed cache in local mode.
         // This line will be removed after the support is added by Hadoop team.
-        if (!pigContext.getExecType().isLocal()) {
+        if (!isLocal(pigContext, conf)) {
             symlink = prefix + "_"
                     + Integer.toString(System.identityHashCode(filename)) + "_"
                     + Long.toString(System.currentTimeMillis());
@@ -1487,6 +1583,118 @@ public class JobControlCompiler{
         }
     }
 
+    /**
+     * if url is not in HDFS will copy the path to HDFS from local before adding to distributed cache
+     * @param pigContext the pigContext
+     * @param conf the job conf
+     * @param url the url to be added to distributed cache
+     * @return the path as seen on distributed cache
+     * @throws IOException
+     */
+    @SuppressWarnings("deprecation")
+    private static void putJarOnClassPathThroughDistributedCache(
+            PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+
+        // Turn on the symlink feature
+        DistributedCache.createSymlink(conf);
+
+        // REGISTER always copies locally the jar file. see PigServer.registerJar()
+        Path pathInHDFS = shipToHDFS(pigContext, conf, url);
+        // and add to the DistributedCache
+        DistributedCache.addFileToClassPath(pathInHDFS, conf);
+        pigContext.skipJars.add(url.getPath());
+    }
+
+    private static Path getCacheStagingDir(Configuration conf) throws IOException {
+        String pigTempDir = conf.get(PigConfiguration.PIG_USER_CACHE_LOCATION,
+                conf.get(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
+        String currentUser = System.getProperty("user.name");
+        Path stagingDir = new Path(pigTempDir + "/" + currentUser + "/", ".pigcache");
+        FileSystem fs = FileSystem.get(conf);
+        fs.mkdirs(stagingDir);
+        fs.setPermission(stagingDir, FileLocalizer.OWNER_ONLY_PERMS);
+        return stagingDir;
+    }
+
+    private static Path getFromCache(PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+        try {
+            Path stagingDir = getCacheStagingDir(conf);
+            String filename = FilenameUtils.getName(url.getPath());
+
+            String checksum = DigestUtils.shaHex(url.openStream());
+            FileSystem fs = FileSystem.get(conf);
+            Path cacheDir = new Path(stagingDir, checksum);
+            FileStatus [] statuses = fs.listStatus(cacheDir);
+            if (statuses != null) {
+                for (FileStatus stat : statuses) {
+                    Path jarPath = stat.getPath();
+                    if(jarPath.getName().equals(filename)) {
+                        log.info("Found " + url + " in jar cache at "+ stagingDir);
+                        long curTime = System.currentTimeMillis();
+                        fs.setTimes(jarPath, -1, curTime);
+                        return jarPath;
+                    }
+                }
+            }
+            log.info("Url "+ url + " was not found in jarcache at "+ stagingDir);
+            // attempt to copy to cache else return null
+            fs.mkdirs(cacheDir, FileLocalizer.OWNER_ONLY_PERMS);
+            Path cacheFile = new Path(cacheDir, filename);
+            OutputStream os = FileSystem.create(fs, cacheFile, FileLocalizer.OWNER_ONLY_PERMS);
+            try {
+                IOUtils.copyBytes(url.openStream(), os, 4096, true);
+            } finally {
+                os.close();
+            }
+            return cacheFile;
+
+        } catch (IOException ioe) {
+            log.info("Unable to retrieve jar from jar cache ", ioe);
+            return null;
+        }
+    }
+
+    /**
+     * copy the file to hdfs in a temporary path
+     * @param pigContext the pig context
+     * @param conf the job conf
+     * @param url the url to ship to hdfs
+     * @return the location where it was shipped
+     * @throws IOException
+     */
+    private static Path shipToHDFS(
+            PigContext pigContext,
+            Configuration conf,
+            URL url) throws IOException {
+
+        boolean cacheEnabled =
+                conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, false);
+        if (cacheEnabled) {
+            Path pathOnDfs = getFromCache(pigContext, conf, url);
+            if(pathOnDfs != null) {
+                return pathOnDfs;
+            }
+        }
+        String suffix = FilenameUtils.getName(url.getPath());
+
+        Path dst = new Path(FileLocalizer.getTemporaryPath(pigContext).toUri().getPath(), suffix);
+        FileSystem fs = dst.getFileSystem(conf);
+        OutputStream os = fs.create(dst);
+        try {
+            IOUtils.copyBytes(url.openStream(), os, 4096, true);
+        } finally {
+            // IOUtils can not close both the input and the output properly in a finally
+            // as we can get an exception in between opening the stream and calling the method
+            os.close();
+        }
+        return dst;
+    }
+
+
     private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
 
         private PigContext pigContext = null;
@@ -1506,7 +1714,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType().isLocal()) return;
+            if (isLocal(pigContext, conf)) return;
 
             // set up distributed cache for the replicated files
             FileSpec[] replFiles = join.getReplFiles();
@@ -1524,8 +1732,8 @@ public class JobControlCompiler{
                         symlink = "pigrepl_" + join.getOperatorKey().toString() + "_"
                                 + Integer.toString(System.identityHashCode(
                                         replFiles[i].getFileName()))
-                                + "_" + Long.toString(System.currentTimeMillis())
-                                + "_" + i;
+                                        + "_" + Long.toString(System.currentTimeMillis())
+                                        + "_" + i;
                         replicatedPath.add(replFiles[i].getFileName() + "#"
                                 + symlink);
 
@@ -1562,7 +1770,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType().isLocal()) return;
+            if (isLocal(pigContext, conf)) return;
 
             String indexFile = join.getIndexFile();
 
@@ -1586,7 +1794,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType().isLocal()) return;
+            if (isLocal(pigContext, conf)) return;
 
             String indexFile = mergeCoGrp.getIndexFileName();
 
@@ -1623,7 +1831,7 @@ public class JobControlCompiler{
 
             // XXX Hadoop currently doesn't support distributed cache in local mode.
             // This line will be removed after the support is added
-            if (pigContext.getExecType().isLocal()) return;
+            if (isLocal(pigContext, conf)) return;
 
             // set up distributed cache for files indicated by the UDF
             String[] files = func.getCacheFiles();
@@ -1659,7 +1867,7 @@ public class JobControlCompiler{
                     if (replaced) {
                         // sample job should have only one ConstantExpression
                         throw new VisitorException("Invalid reduce plan: more " +
-                                       "than one ConstantExpression found in sampling job");
+                                "than one ConstantExpression found in sampling job");
                     }
                     cnst.setValue(rp);
                     cnst.setRequestedParallelism(rp);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Feb 24 21:41:38 2014
@@ -42,6 +42,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.OrderedLoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -1721,12 +1722,12 @@ public class MRCompiler extends PhyPlanV
             String msg = "Error compiling operator " + joinOp.getClass().getCanonicalName();
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
-       catch (IOException e){
+        catch (IOException e){
            int errCode = 3000;
            String errMsg = "IOException caught while compiling POMergeJoin";
             throw new MRCompilerException(errMsg, errCode,e);
         }
-       catch(CloneNotSupportedException e){
+        catch(CloneNotSupportedException e){
            int errCode = 2127;
            String errMsg = "Cloning exception caught while compiling POMergeJoin";
            throw new MRCompilerException(errMsg, errCode, PigException.BUG, e);
@@ -2347,13 +2348,14 @@ public class MRCompiler extends PhyPlanV
               FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
               String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
 
-          String[] rslargs = new String[2];
+        String[] rslargs = new String[2];
         // SampleLoader expects string version of FuncSpec
         // as its first constructor argument.
 
         rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
-
-        rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
+        // This value is only used by order by. For skewed join, it's calculated
+        // based on the file size.
+        rslargs[1] = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100");
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
                 new FuncSpec(sampleLdrClassName, rslargs));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Mon Feb 24 21:41:38 2014
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.transform.TransformerException;
@@ -49,6 +50,7 @@ import org.apache.pig.backend.BackendExc
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler.LastInputStreamingOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.DotMRPrinter;
@@ -89,7 +91,7 @@ public class MapReduceLauncher extends L
     public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
 
     public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-        "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+            "mapreduce.fileoutputcommitter.marksuccessfuljobs";
 
     private static final Log log = LogFactory.getLog(MapReduceLauncher.class);
 
@@ -141,32 +143,36 @@ public class MapReduceLauncher extends L
         return failureMap.get(spec);
     }
 
-    @SuppressWarnings("deprecation")
     @Override
     public PigStats launchPig(PhysicalPlan php,
-                              String grpName,
-                              PigContext pc) throws PlanException,
-                                                    VisitorException,
-                                                    IOException,
-                                                    ExecException,
-                                                    JobCreationException,
-                                                    Exception {
+            String grpName,
+            PigContext pc) throws PlanException,
+            VisitorException,
+            IOException,
+            ExecException,
+            JobCreationException,
+            Exception {
         long sleepTime = 500;
-        aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+        aggregateWarning = Boolean.valueOf(pc.getProperties().getProperty("aggregate.warning"));
         MROperPlan mrp = compile(php, pc);
 
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
 
         MRExecutionEngine exe = (MRExecutionEngine) pc.getExecutionEngine();
-        JobClient jobClient = new JobClient(exe.getJobConf());
+        Properties defaultProperties = new Properties();
+        JobConf defaultJobConf = exe.getLocalConf(defaultProperties);
+        Utils.recomputeProperties(defaultJobConf, defaultProperties);
+
+        // This is a generic JobClient for checking progress of the jobs
+        JobClient statsJobClient = new JobClient(exe.getJobConf());
 
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
+        JobControlCompiler jcc = new JobControlCompiler(pc, conf, ConfigurationUtil.toConfiguration(defaultProperties));
 
         MRScriptState.get().addWorkflowAdjacenciesToConf(mrp, conf);
 
         // start collecting statistics
-        MRPigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
+        MRPigStatsUtil.startCollection(pc, statsJobClient, jcc, mrp);
 
         // Find all the intermediate data stores. The plan will be destroyed during compile/execution
         // so this needs to be done before.
@@ -187,7 +193,7 @@ public class MapReduceLauncher extends L
         JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
 
         boolean stop_on_failure =
-            pc.getProperties().getProperty("stop.on.failure", "false").equals("true");
+            Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
 
         // jc is null only when mrp.size == 0
         while(mrp.size() != 0) {
@@ -210,14 +216,14 @@ public class MapReduceLauncher extends L
                             failedNativeMR.add(natOp);
 
                             String msg = "Error running native mapreduce" +
-                            " operator job :" + natOp.getJobId() + e.getMessage();
+                                    " operator job :" + natOp.getJobId() + e.getMessage();
 
                             String stackTrace = Utils.getStackStraceStr(e);
                             LogUtils.writeLog(msg,
                                     stackTrace,
                                     pc.getProperties().getProperty("pig.logfile"),
                                     log
-                            );
+                                    );
                             log.info(msg);
 
                             if (stop_on_failure) {
@@ -250,10 +256,10 @@ public class MapReduceLauncher extends L
             JobConf jobConf = jobsWithoutIds.get(0).getJobConf();
             try {
                 String port = jobConf.get("mapred.job.tracker.http.address");
-                String jobTrackerAdd = jobConf.get(MRExecutionEngine.JOB_TRACKER_LOCATION);
+                String jobTrackerAdd = jobConf.get(HExecutionEngine.JOB_TRACKER_LOCATION);
 
                 jobTrackerLoc = jobTrackerAdd.substring(0,jobTrackerAdd.indexOf(":"))
-                + port.substring(port.indexOf(":"));
+                        + port.substring(port.indexOf(":"));
             }
             catch(Exception e){
                 // Could not get the job tracker location, most probably we are running in local mode.
@@ -336,7 +342,7 @@ public class MapReduceLauncher extends L
                     }
                     jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
 
-                    double prog = (numMRJobsCompl+calculateProgress(jc, jobClient))/totalMRJobs;
+                    double prog = (numMRJobsCompl+calculateProgress(jc, statsJobClient))/totalMRJobs;
                     if (notifyProgress(prog, lastProg)) {
                         lastProg = prog;
                     }
@@ -363,8 +369,8 @@ public class MapReduceLauncher extends L
                         if (jobControlExceptionStackTrace != null) {
                             LogUtils.writeLog("Error message from job controller",
                                     jobControlExceptionStackTrace, pc
-                                            .getProperties().getProperty(
-                                                    "pig.logfile"), log);
+                                    .getProperties().getProperty(
+                                            "pig.logfile"), log);
                         }
                         throw jobControlException;
                     } else {
@@ -418,10 +424,13 @@ public class MapReduceLauncher extends L
             failed = true;
         }
 
-        if (!"false".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.PIG_DELETE_TEMP_FILE))) {
+        if (Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.PIG_DELETE_TEMP_FILE, "true"))) {
             // Clean up all the intermediate data
             for (String path : intermediateVisitor.getIntermediate()) {
-                FileLocalizer.delete(path, pc);
+                // Skip non-file system paths such as hbase, see PIG-3617
+                if (Utils.hasFileSystemImpl(new Path(path), conf)) {
+                    FileLocalizer.delete(path, pc);
+                }
             }
         }
 
@@ -431,7 +440,7 @@ public class MapReduceLauncher extends L
             Exception backendException = null;
             for (Job fj : failedJobs) {
                 try {
-                    getStats(fj, jobClient, true, pc);
+                    getStats(fj, statsJobClient, true, pc);
                 } catch (Exception e) {
                     backendException = e;
                 }
@@ -471,9 +480,9 @@ public class MapReduceLauncher extends L
                     }
                 }
 
-                getStats(job, jobClient, false, pc);
+                getStats(job, statsJobClient, false, pc);
                 if (aggregateWarning) {
-                    computeWarningAggregate(job, jobClient, warningAggMap);
+                    computeWarningAggregate(job, statsJobClient, warningAggMap);
                 }
             }
 
@@ -496,8 +505,8 @@ public class MapReduceLauncher extends L
 
         int ret = failed ? ((succJobs != null && succJobs.size() > 0)
                 ? ReturnCode.PARTIAL_FAILURE
-                : ReturnCode.FAILURE)
-                : ReturnCode.SUCCESS;
+                        : ReturnCode.FAILURE)
+                        : ReturnCode.SUCCESS;
 
         PigStats pigStats = PigStatsUtil.getPigStats(ret);
         // run cleanup for all of the stores
@@ -574,7 +583,7 @@ public class MapReduceLauncher extends L
             PrintStream ps,
             String format,
             boolean verbose) throws PlanException, VisitorException,
-                                   IOException {
+            IOException {
         log.trace("Entering MapReduceLauncher.explain");
         MROperPlan mrp = compile(php, pc);
 
@@ -636,9 +645,9 @@ public class MapReduceLauncher extends L
 
         // We must ensure that there is only 1 reducer for a limit. Add a single-reducer job.
         if (!pc.inIllustrator) {
-        LimitAdjuster la = new LimitAdjuster(plan, pc);
-        la.visit();
-        la.adjust();
+            LimitAdjuster la = new LimitAdjuster(plan, pc);
+            la.visit();
+            la.adjust();
         }
         // Optimize to use secondary sort key if possible
         prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
@@ -653,7 +662,7 @@ public class MapReduceLauncher extends L
 
         // optimize joins
         LastInputStreamingOptimizer liso =
-            new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
+                new MRCompiler.LastInputStreamingOptimizer(plan, lastInputChunkSize);
         liso.visit();
 
         // figure out the type of the key for the map plan
@@ -668,7 +677,7 @@ public class MapReduceLauncher extends L
         fRem.visit();
 
         boolean isMultiQuery =
-            "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
+            Boolean.valueOf(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
 
         if (isMultiQuery) {
             // reduces the number of MROpers in the MR plan generated
@@ -690,7 +699,7 @@ public class MapReduceLauncher extends L
         checker.visit();
 
         boolean isAccum =
-            "true".equalsIgnoreCase(pc.getProperties().getProperty("opt.accumulator","true"));
+            Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
         if (isAccum) {
             AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
             accum.visit();
@@ -700,7 +709,7 @@ public class MapReduceLauncher extends L
 
     private boolean shouldMarkOutputDir(Job job) {
         return job.getJobConf().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
-                               false);
+                false);
     }
 
     private void createSuccessFile(Job job, POStore store) throws IOException {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Mon Feb 24 21:41:38 2014
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Mon Feb 24 21:41:38 2014
@@ -17,26 +17,31 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
+import java.util.Map;
+import java.util.WeakHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 /**
- * 
+ *
  * A singleton class that implements the PigLogger interface
  * for use in map reduce context. Provides ability to aggregate
  * warning messages
  */
 public final class PigHadoopLogger implements PigLogger {
-    private static PigHadoopLogger instance = new PigHadoopLogger();  
-    
-    public static synchronized PigHadoopLogger getInstance() {
-        if (instance == null) {
-            instance = new PigHadoopLogger();
-        }
-        return instance;
-    } 
+    private static class PigHadoopLoggerHelper {
+        private static PigHadoopLogger instance = new PigHadoopLogger();
+    }
+
+    public static PigHadoopLogger getInstance() {
+        return PigHadoopLoggerHelper.instance;
+    }
 
     private static Log log = LogFactory.getLog(PigHadoopLogger.class);
 
@@ -44,21 +49,33 @@ public final class PigHadoopLogger imple
 
     private boolean aggregate = false;
 
+    private Map<Object, String> msgMap = new WeakHashMap<Object, String>();
+
     private PigHadoopLogger() {
-    }    
+    }
 
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
-        String displayMessage = o.getClass().getName() + ": " + msg;
-        
+        String className = o.getClass().getName();
+        String displayMessage = className + "(" + warningEnum + "): " + msg;
+
         if (getAggregate()) {
             if (reporter != null) {
-                reporter.getCounter(warningEnum).increment(1);
+                // log atleast once
+                if (msgMap.get(o) == null || !msgMap.get(o).equals(displayMessage)) {
+                    log.warn(displayMessage);
+                    msgMap.put(o, displayMessage);
+                }
+                if (o instanceof EvalFunc || o instanceof LoadFunc || o instanceof StoreFunc) {
+                    reporter.getCounter(className, warningEnum.name()).increment(1);
+                } else {
+                    reporter.getCounter(warningEnum).increment(1);
+                }
             } else {
                 //TODO:
                 //in local mode of execution if the PigHadoopLogger is used initially,
-                //then aggregation cannot be performed as the reporter will be null. 
-                //The reference to a reporter is given by Hadoop at run time. 
+                //then aggregation cannot be performed as the reporter will be null.
+                //The reference to a reporter is given by Hadoop at run time.
                 //In local mode, due to the absence of Hadoop there will be no reporter
                 //Just print the warning message as is.
                 //If a warning message is printed in map reduce mode when aggregation
@@ -68,16 +85,16 @@ public final class PigHadoopLogger imple
         } else {
             log.warn(displayMessage);
         }
-    }    
+    }
 
     public synchronized void setReporter(PigStatusReporter rep) {
         this.reporter = rep;
     }
-    
+
     public synchronized boolean getAggregate() {
         return aggregate;
     }
-    
+
     public synchronized void setAggregate(boolean aggregate) {
         this.aggregate = aggregate;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Mon Feb 24 21:41:38 2014
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -56,14 +55,6 @@ public class PigInputFormat extends Inpu
     public static final Log log = LogFactory
             .getLog(PigInputFormat.class);
 
-    private static final PathFilter hiddenFileFilter = new PathFilter() {
-        @Override
-        public boolean accept(Path p) {
-            String name = p.getName();
-            return !name.startsWith("_") && !name.startsWith(".");
-        }
-    };
-
     public static final String PIG_INPUTS = "pig.inputs";
 
     /**
@@ -77,6 +68,7 @@ public class PigInputFormat extends Inpu
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
+    @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader(
             org.apache.hadoop.mapreduce.InputSplit split,
@@ -127,7 +119,7 @@ public class PigInputFormat extends Inpu
      * @throws IOException
      */
     static void mergeSplitSpecificConf(LoadFunc loadFunc, PigSplit pigSplit, Configuration originalConf)
-    throws IOException {
+            throws IOException {
         // set up conf with entries from input specific conf
         Job job = new Job(originalConf);
         loadFunc.setLocation(getLoadLocation(pigSplit.getInputIndex(),
@@ -147,8 +139,8 @@ public class PigInputFormat extends Inpu
     @SuppressWarnings("unchecked")
     private static LoadFunc getLoadFunc(int inputIndex, Configuration conf) throws IOException {
         ArrayList<FileSpec> inputs =
-            (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PIG_INPUTS));
+                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
+                        conf.get(PIG_INPUTS));
         FuncSpec loadFuncSpec = inputs.get(inputIndex).getFuncSpec();
         return (LoadFunc) PigContext.instantiateFuncFromSpec(loadFuncSpec);
     }
@@ -156,8 +148,8 @@ public class PigInputFormat extends Inpu
     @SuppressWarnings("unchecked")
     private static String getLoadLocation(int inputIndex, Configuration conf) throws IOException {
         ArrayList<FileSpec> inputs =
-            (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PIG_INPUTS));
+                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
+                        conf.get(PIG_INPUTS));
         return inputs.get(inputIndex).getFileName();
     }
 
@@ -174,8 +166,8 @@ public class PigInputFormat extends Inpu
     static void passLoadSignature(LoadFunc loadFunc, int inputIndex,
             Configuration conf) throws IOException {
         List<String> inpSignatureLists =
-            (ArrayList<String>)ObjectSerializer.deserialize(
-                    conf.get("pig.inpSignatures"));
+                (ArrayList<String>)ObjectSerializer.deserialize(
+                        conf.get("pig.inpSignatures"));
         // signature can be null for intermediate jobs where it will not
         // be required to be passed down
         if(inpSignatureLists.get(inputIndex) != null) {
@@ -189,10 +181,10 @@ public class PigInputFormat extends Inpu
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)
      */
-    @SuppressWarnings("unchecked")
+    @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     public List<InputSplit> getSplits(JobContext jobcontext)
-                        throws IOException, InterruptedException {
+            throws IOException, InterruptedException {
 
         Configuration conf = jobcontext.getConfiguration();
 
@@ -250,8 +242,8 @@ public class PigInputFormat extends Inpu
                 LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
                         loadFuncSpec);
                 boolean combinable = !(loadFunc instanceof MergeJoinIndexer
-                                    || loadFunc instanceof IndexableLoadFunc
-                                    || (loadFunc instanceof CollectableLoadFunc && loadFunc instanceof OrderedLoadFunc));
+                        || loadFunc instanceof IndexableLoadFunc
+                        || (loadFunc instanceof CollectableLoadFunc && loadFunc instanceof OrderedLoadFunc));
                 if (combinable)
                     combinable = !conf.getBoolean("pig.noSplitCombination", false);
                 JobConf confClone = new JobConf(conf);
@@ -280,9 +272,9 @@ public class PigInputFormat extends Inpu
                 int errCode = 2118;
                 String msg = "Unable to create input splits for: " + inputs.get(i).getFileName();
                 if(e.getMessage() !=null && (!e.getMessage().isEmpty()) ){
-                	throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
+                    throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
                 }else{
-                	throw new ExecException(msg, errCode, PigException.BUG, e);
+                    throw new ExecException(msg, errCode, PigException.BUG, e);
                 }
             }
         }
@@ -311,7 +303,7 @@ public class PigInputFormat extends Inpu
 
     protected List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
             int inputIndex, ArrayList<OperatorKey> targetOps, long blockSize, boolean combinable, Configuration conf)
-            throws IOException, InterruptedException {
+                    throws IOException, InterruptedException {
         ArrayList<InputSplit> pigSplits = new ArrayList<InputSplit>();
         if (!combinable) {
             int splitIndex = 0;
@@ -328,7 +320,7 @@ public class PigInputFormat extends Inpu
                 // default is the block size
                 maxCombinedSplitSize = blockSize;
             List<List<InputSplit>> combinedSplits =
-                MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
+                    MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
             for (int i = 0; i < combinedSplits.size(); i++)
                 pigSplits.add(createPigSplit(combinedSplits.get(i), inputIndex, targetOps, i, conf));
             return pigSplits;
@@ -336,7 +328,7 @@ public class PigInputFormat extends Inpu
     }
 
     private InputSplit createPigSplit(List<InputSplit> combinedSplits,
-        int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
+            int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
     {
         PigSplit pigSplit = new PigSplit(combinedSplits.toArray(new InputSplit[0]), inputIndex, targetOps, splitIndex);
         pigSplit.setConf(conf);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java Mon Feb 24 21:41:38 2014
@@ -22,18 +22,22 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -204,7 +208,21 @@ public class PigOutputFormat extends Out
             
             // The above call should have update the conf in the JobContext
             // to have the output location - now call checkOutputSpecs()
-            of.checkOutputSpecs(jobContextCopy);
+            try {
+                of.checkOutputSpecs(jobContextCopy);
+            } catch (IOException ioe) {
+                boolean shouldThrowException = true;
+                if (sFunc instanceof OverwritableStoreFunc) {
+                    if (((OverwritableStoreFunc) sFunc).shouldOverwrite()) {
+                        if (ioe instanceof FileAlreadyExistsException
+                                || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+                            shouldThrowException = false;
+                        }
+                    }
+                }
+                if (shouldThrowException)
+                    throw ioe;
+            }
         }
     }
     /**

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Feb 24 21:41:38 2014
@@ -125,11 +125,7 @@ public class POUserFunc extends Expressi
     private void instantiateFunc(FuncSpec fSpec) {
         this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(fSpec);
         this.setSignature(signature);
-        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
-    	Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
-
-    	if(tmpS!=null)
-    		this.func.setInputSchema(tmpS);
+        this.setFuncInputSchema(signature);
         if (func.getClass().isAnnotationPresent(MonitoredUDF.class)) {
             executor = new MonitoredUDFExecutor(func);
         }
@@ -609,4 +605,17 @@ public class POUserFunc extends Expressi
             this.func.setUDFContextSignature(signature);
         }
     }
+
+    /**
+     * Sets EvalFunc's inputschema based on the signature
+     * @param signature
+     */
+    public void setFuncInputSchema(String signature) {
+        Properties props = UDFContext.getUDFContext().getUDFProperties(func.getClass());
+        Schema tmpS=(Schema)props.get("pig.evalfunc.inputschema."+signature);
+        if(tmpS!=null) {
+            this.func.setInputSchema(tmpS);
+        }
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Mon Feb 24 21:41:38 2014
@@ -369,7 +369,7 @@ public class POPartialAgg extends Physic
             Result res = getOutput(entry.getKey(), valueTuple);
             iter.remove();
             addKeyValToMap(toMap, entry.getKey(), getAggResultTuple(res.result));
-            numEntriesInTarget += valueTuple.size() - 1;
+            numEntriesInTarget++;
         }
         return numEntriesInTarget;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Feb 24 21:41:38 2014
@@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -32,9 +31,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -62,6 +59,14 @@ public class POStream extends PhysicalOp
 
     protected boolean allOutputFromBinaryProcessed = false;
 
+    /**
+     * This flag indicates whether streaming is done through fetching. If set,
+     * {@link FetchLauncher} pulls out the data from the pipeline. Therefore we need to
+     * skip the case in {@link #getNextTuple()} which is called by map() or reduce() when
+     * processing the next tuple.
+     */
+    private boolean isFetchable;
+
     public POStream(OperatorKey k, ExecutableManager executableManager, 
                       StreamingCommand command, Properties properties) {
         super(k);
@@ -170,7 +175,7 @@ public class POStream extends PhysicalOp
             // if we are here, we haven't consumed all input to be sent
             // to the streaming binary - check if we are being called
             // from close() on the map or reduce
-            if(this.parentPlan.endOfAllInput) {
+            if(isFetchable || this.parentPlan.endOfAllInput) {
                 Result r = getNextHelper((Tuple)null);
                 if(r.returnStatus == POStatus.STATUS_EOP) {
                     // we have now seen *ALL* possible input
@@ -373,4 +378,19 @@ public class POStream extends PhysicalOp
       }
       return (Tuple) out;
     }
+
+    /**
+     * @return true if streaming is done through fetching
+     */
+    public boolean isFetchable() {
+        return isFetchable;
+    }
+
+    /**
+     * @param isFetchable - whether fetching is applied on POStream
+     */
+    public void setFetchable(boolean isFetchable) {
+        this.isFetchable = isFetchable;
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Feb 24 21:41:38 2014
@@ -42,7 +42,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.BinSedesTuple;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.ReadScalarsTez;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.common.TezUtils;

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1571454&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Mon Feb 24 21:41:38 2014
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class ReadScalarsTez extends EvalFunc<Object> implements TezLoad {
+    private static final Log LOG = LogFactory.getLog(ReadScalarsTez.class);
+    private String inputKey;
+    private transient Tuple t;
+    private transient LogicalInput input;
+
+    public ReadScalarsTez(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        String cacheKey = "scalar-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        String cacheKey = "scalar-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            t = (Tuple) cacheValue;
+            return;
+        }
+        input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            KeyValueReader reader = (KeyValueReader) input.getReader();
+            if (reader.next()) {
+                t = (Tuple) reader.getCurrentValue();
+                if (reader.next()) {
+                    String msg = "Scalar has more than one row in the output. "
+                            + "1st : " + t + ", 2nd :"
+                            + reader.getCurrentValue();
+                    throw new ExecException(msg);
+                }
+            } else {
+                LOG.info("Scalar input from vertex " + inputKey + " is null");
+            }
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+        ObjectCache.getInstance().cache(cacheKey, t);
+        log.info("Cached scalar in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+    }
+
+    @Override
+    public Object exec(Tuple input) throws IOException {
+        int pos = (Integer) input.get(0);
+        Object obj = t.get(pos);
+        return obj;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Feb 24 21:41:38 2014
@@ -85,7 +85,6 @@ import org.apache.pig.impl.builtin.Defau
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.builtin.ReadScalarsTez;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -236,22 +235,22 @@ public class TezCompiler extends PhyPlan
             }
             tezOper.setClosed(true);
         }
-        
+
         fixScalar();
 
         return tezPlan;
     }
-    
+
     private void fixScalar() throws VisitorException, PlanException {
         // Mapping POStore to POValueOuptut
         Map<POStore, POValueOutputTez> storeSeen = new HashMap<POStore, POValueOutputTez>();
-        
+
         for (TezOperator tezOp : tezPlan) {
             List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
             for (POUserFunc userFunc : userFuncs) {
                 if (userFunc.getReferencedOperator()!=null) {  // Scalar
                     POStore store = (POStore)userFunc.getReferencedOperator();
- 
+
                     TezOperator from = phyToTezOpMap.get(store);
 
                     FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
@@ -323,7 +322,7 @@ public class TezCompiler extends PhyPlan
                 POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
                 storeTezOper.plan.addAsLeaf(valueOutput);
                 storeTezOper.setSplitter(true);
-                
+
                 // Create a splittee of store only
                 TezOperator storeOnlyTezOperator = getTezOp();
                 PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan();
@@ -334,7 +333,7 @@ public class TezCompiler extends PhyPlan
                 storeOnlyTezOperator.plan = storeOnlyPhyPlan;
                 tezPlan.add(storeOnlyTezOperator);
                 phyToTezOpMap.put(store, storeOnlyTezOperator);
-                
+
                 // Create new operator as second splittee
                 curTezOp = getTezOp();
                 POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
@@ -348,13 +347,13 @@ public class TezCompiler extends PhyPlan
                 edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
                 edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
                 storeOnlyTezOperator.setRequestedParallelismByReference(storeTezOper);
-                
+
                 edge = TezCompilerUtil.connect(tezPlan, storeTezOper, curTezOp);
                 edge.dataMovementType = DataMovementType.ONE_TO_ONE;
                 edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
                 edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
                 curTezOp.setRequestedParallelismByReference(storeTezOper);
-                
+
                 return;
             }
 
@@ -1723,7 +1722,7 @@ public class TezCompiler extends PhyPlan
         oper2.setGlobalSort(true);
         opers[1] = oper2;
         tezPlan.add(oper2);
-        
+
         long limit = sort.getLimit();
         //TODO: TezOperator limit not used at all
         oper2.limit = limit;
@@ -1738,7 +1737,7 @@ public class TezCompiler extends PhyPlan
             }
             oper2.setSortOrder(sortOrder);
         }
-        
+
         identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
 
         if (limit!=-1) {
@@ -1982,7 +1981,7 @@ public class TezCompiler extends PhyPlan
             curTezOp.plan.add(pkg);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             phyToTezOpMap.put(op, curTezOp);
-            // TODO: Use alias vertex that is introduced by TEZ-678 
+            // TODO: Use alias vertex that is introduced by TEZ-678
         } catch (Exception e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Feb 24 21:41:38 2014
@@ -201,7 +201,7 @@ public class TezDagBuilder extends TezOp
                 break;
             }
         }
-        
+
         List<POValueOutputTez> valueOutputs = PlanHelper.getPhysicalOperators(from.plan,
                 POValueOutputTez.class);
         if (!valueOutputs.isEmpty()) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Feb 24 21:41:38 2014
@@ -168,7 +168,7 @@ public class TezLauncher extends Launche
             SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
             skOptimizer.visit();
         }
-        
+
         boolean isMultiQuery =
                 "true".equalsIgnoreCase(pc.getProperties().getProperty(PigConfiguration.OPT_MULTIQUERY, "true"));
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Mon Feb 24 21:41:38 2014
@@ -41,6 +41,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -175,6 +176,48 @@ public class MapRedUtil {
         }
     }
     
+    /**
+     * Sets up output and log dir paths for a single-store streaming job
+     *
+     * @param st - POStore of the current job
+     * @param pigContext
+     * @param conf
+     * @throws IOException
+     */
+    public static void setupStreamingDirsConfSingle(POStore st, PigContext pigContext,
+            Configuration conf) throws IOException {
+        // set out filespecs
+        String outputPathString = st.getSFile().getFileName();
+        if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+            conf.set("pig.streaming.log.dir",
+                    new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+        }
+        else {
+            String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+            Path tmpLocation = new Path(tmpLocationStr);
+            conf.set("pig.streaming.log.dir",
+                    new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+        }
+        conf.set("pig.streaming.task.output.dir", outputPathString);
+    }
+
+    /**
+     * Sets up output and log dir paths for a multi-store streaming job
+     *
+     * @param pigContext
+     * @param conf
+     * @throws IOException
+     */
+    public static void setupStreamingDirsConfMulti(PigContext pigContext, Configuration conf)
+            throws IOException {
+
+        String tmpLocationStr = FileLocalizer.getTemporaryPath(pigContext).toString();
+        Path tmpLocation = new Path(tmpLocationStr);
+        conf.set("pig.streaming.log.dir",
+                new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+        conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+    }
+
     public static FileSpec checkLeafIsStore(
             PhysicalPlan plan,
             PigContext pigContext) throws ExecException {