You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC

svn commit: r1571454 [4/5] - in /pig/branches/tez: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/ contrib/piggybank/java/src/main/java/...

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Feb 24 21:41:38 2014
@@ -37,6 +37,8 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
@@ -166,7 +168,7 @@ public class HBaseStorage extends LoadFu
     private final long limit_;
     private final boolean cacheBlocks_;
     private final int caching_;
-    private final boolean noWAL_;
+    private boolean noWAL_;
     private final long minTimestamp_;
     private final long maxTimestamp_;
     private final long timestamp_;
@@ -183,7 +185,8 @@ public class HBaseStorage extends LoadFu
     private RequiredFieldList requiredFieldList;
 
     private static void populateValidOptions() {
-        validOptions_.addOption("loadKey", false, "Load Key");
+        Option loadKey = OptionBuilder.hasOptionalArgs(1).withArgName("loadKey").withLongOpt("loadKey").withDescription("Load Key").create();
+        validOptions_.addOption(loadKey);
         validOptions_.addOption("gt", true, "Records must be greater than this value " +
                 "(binary, double-slash-escaped)");
         validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
@@ -197,11 +200,11 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
         validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
                 "HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
-        validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
+        Option noWal = OptionBuilder.hasOptionalArgs(1).withArgName("noWAL").withLongOpt("noWAL").withDescription("Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).").create();
+        validOptions_.addOption(noWal);
         validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value");
         validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value");
         validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value");
-
     }
 
     /**
@@ -263,7 +266,13 @@ public class HBaseStorage extends LoadFu
             throw e;
         }
 
-        loadRowKey_ = configuredOptions_.hasOption("loadKey");
+		loadRowKey_ = false;
+		if (configuredOptions_.hasOption("loadKey")) {
+			String value = configuredOptions_.getOptionValue("loadKey");
+			if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {//the empty string and null check is for backward compat.
+				loadRowKey_ = true;
+			}
+		}
 
         delimiter_ = ",";
         if (configuredOptions_.getOptionValue("delim") != null) {
@@ -302,7 +311,13 @@ public class HBaseStorage extends LoadFu
         caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
         cacheBlocks_ = Boolean.valueOf(configuredOptions_.getOptionValue("cacheBlocks", "false"));
         limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
-        noWAL_ = configuredOptions_.hasOption("noWAL");
+        noWAL_ = false;
+		if (configuredOptions_.hasOption("noWAL")) {
+			String value = configuredOptions_.getOptionValue("noWAL");
+			if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
+				noWAL_ = true;
+			}
+		}        
 
         if (configuredOptions_.hasOption("minTimestamp")){
             minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));

Modified: pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java Mon Feb 24 21:41:38 2014
@@ -143,8 +143,8 @@ implements StoreFuncInterface, LoadMetad
     private static final Log mLog = LogFactory.getLog(BinStorage.class);
     protected long                end            = Long.MAX_VALUE;
 
-    static String casterString = null;
-    static LoadCaster caster = null;
+    private String casterString = null;
+    private LoadCaster caster = null;
 
     private BinStorageRecordReader recReader = null;
     private BinStorageRecordWriter recWriter = null;

Modified: pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java Mon Feb 24 21:41:38 2014
@@ -113,7 +113,7 @@ public class JsonStorage extends StoreFu
         UDFContext udfc = UDFContext.getUDFContext();
         Properties p =
             udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
-        p.setProperty(SCHEMA_SIGNATURE, s.toString());
+        p.setProperty(SCHEMA_SIGNATURE, fixSchema(s).toString());
     }
 
 
@@ -310,4 +310,12 @@ public class JsonStorage extends StoreFu
         metadataWriter.storeSchema(schema, location, job);
     }
 
+    public ResourceSchema fixSchema(ResourceSchema s){
+      for (ResourceFieldSchema filed : s.getFields()) {
+        if(filed.getType() == DataType.NULL)
+          filed.setType(DataType.BYTEARRAY);
+      }
+      return s;
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java Mon Feb 24 21:41:38 2014
@@ -27,10 +27,14 @@ import org.apache.commons.cli.CommandLin
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.BZip2Codec;
@@ -50,6 +54,7 @@ import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
 import org.apache.pig.LoadPushDown;
+import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -61,10 +66,12 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.CastUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -125,7 +132,7 @@ import org.apache.pig.parser.ParserExcep
  */
 @SuppressWarnings("unchecked")
 public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
-LoadPushDown, LoadMetadata, StoreMetadata {
+LoadPushDown, LoadMetadata, StoreMetadata, OverwritableStoreFunc {
     protected RecordReader in = null;
     protected RecordWriter writer = null;
     protected final Log mLog = LogFactory.getLog(getClass());
@@ -138,6 +145,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
 
     boolean isSchemaOn = false;
     boolean dontLoadSchema = false;
+    boolean overwriteOutput = false;
     protected ResourceSchema schema;
     protected LoadCaster caster;
 
@@ -161,6 +169,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
         validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
         validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
         validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
+        Option overwrite = OptionBuilder.hasOptionalArgs(1).withArgName("overwrite").withLongOpt("overwrite").withDescription("Overwrites the destination.").create();
+        validOptions.addOption(overwrite);        
     }
 
     public PigStorage() {
@@ -200,6 +210,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
         try {
             configuredOptions = parser.parse(validOptions, optsArr);
             isSchemaOn = configuredOptions.hasOption("schema");
+            if (configuredOptions.hasOption("overwrite")) {
+                String value = configuredOptions.getOptionValue("overwrite");
+                if ("true".equalsIgnoreCase(value)) {
+                    overwriteOutput = true;
+                }
+            }       
             dontLoadSchema = configuredOptions.hasOption("noschema");
             tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
             tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -307,8 +323,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
             // only contains required fields.
             // We walk the requiredColumns array to find required fields,
             // and cast those.
-            for (int i = 0; i < Math.min(fieldSchemas.length, tup.size()); i++) {
+            for (int i = 0; i < fieldSchemas.length; i++) {
                 if (mRequiredColumns == null || (mRequiredColumns.length>i && mRequiredColumns[i])) {
+                    if (tupleIdx >= tup.size()) {
+                        tup.append(null);
+                    }
+                    
                     Object val = null;
                     if(tup.get(tupleIdx) != null){
                         byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();
@@ -319,9 +339,6 @@ LoadPushDown, LoadMetadata, StoreMetadat
                     tupleIdx++;
                 }
             }
-            for (int i = tup.size(); i < fieldSchemas.length; i++) {
-                tup.append(null);
-            }
         }
         return tup;
     }
@@ -567,4 +584,24 @@ LoadPushDown, LoadMetadata, StoreMetadat
             Job job) throws IOException {
 
     }
+
+    @Override
+    public boolean shouldOverwrite() {
+        return this.overwriteOutput;
+    }
+
+    @Override
+    public void cleanupOutput(POStore store, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        String output = conf.get("mapred.output.dir");
+        Path outputPath = null;
+        if (output != null)
+            outputPath = new Path(output);
+        FileSystem fs = outputPath.getFileSystem(conf);
+        try {
+            fs.delete(outputPath, true);
+        } catch (Exception e) {
+            mLog.warn("Could not delete output " + output);
+        }
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java Mon Feb 24 21:41:38 2014
@@ -56,6 +56,19 @@ public abstract class SelfSpillBag exten
         private int cacheLimit = Integer.MAX_VALUE;
         private long memUsage = 0;
         private long numObjsSizeChecked = 0;
+        
+        private static float cachedMemUsage = 0.2F;
+        private static long maxMem = 0;
+        static {
+            maxMem = Runtime.getRuntime().maxMemory();
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String usage = PigMapReduce.sJobConfInternal.get().get(
+                        PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+                if (usage != null) {
+                    cachedMemUsage = Float.parseFloat(usage);
+                }
+            }
+        }
 
         /**
          * @param bagCount
@@ -68,18 +81,10 @@ public abstract class SelfSpillBag exten
         private void init(int bagCount, float percent) {
 
             if (percent < 0) {
-                percent = 0.2F;
-                if (PigMapReduce.sJobConfInternal.get() != null) {
-                    String usage = PigMapReduce.sJobConfInternal.get().get(
-                            PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
-                    if (usage != null) {
-                        percent = Float.parseFloat(usage);
-                    }
-                }
+                percent = cachedMemUsage;
             }
 
-            long max = Runtime.getRuntime().maxMemory();
-            maxMemUsage = (long) ((max * percent) / bagCount);
+            maxMemUsage = (long) ((maxMem * percent) / bagCount);
 
             // set limit to 0, if memusage is 0 or really really small.
             // then all tuples are put into disk

Modified: pig/branches/tez/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigContext.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigContext.java Mon Feb 24 21:41:38 2014
@@ -23,9 +23,9 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.FileWriter;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.IOException;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.lang.reflect.Constructor;
@@ -118,6 +118,9 @@ public class PigContext implements Seria
     // (some functions may come from pig.jar and we don't want the whole jar file.)
     transient public Vector<String> skipJars = new Vector<String>(2);
 
+    // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
+    transient public Vector<String> predeployedJars = new Vector<String>(2);
+    
     // script files that are needed to run a job
     @Deprecated
     public List<String> scriptFiles = new ArrayList<String>();
@@ -145,6 +148,9 @@ public class PigContext implements Seria
     private static ThreadLocal<ArrayList<String>> packageImportList =
         new ThreadLocal<ArrayList<String>>();
 
+    private static ThreadLocal<Map<String,Class<?>>> classCache = 
+        new ThreadLocal<Map<String,Class<?>>>();
+
     private Properties log4jProperties = new Properties();
 
     private Level defaultLogLevel = Level.INFO;
@@ -355,6 +361,17 @@ public class PigContext implements Seria
             Thread.currentThread().setContextClassLoader(PigContext.classloader);
         }
     }
+    
+    /**
+     * Adds the specified path to the predeployed jars list. These jars will 
+     * never be included in generated job jar.
+     * <p>
+     * This can be called for jars that are pre-installed on the Hadoop 
+     * cluster to reduce the size of the job jar.
+     */
+    public void markJarAsPredeployed(String path) {
+        predeployedJars.add(path);
+    }
 
     public String doParamSubstitution(InputStream in,
                                       List<String> params,
@@ -606,12 +623,30 @@ public class PigContext implements Seria
         return new ContextClassLoader(urls, PigContext.class.getClassLoader());
     }
 
+    private static Map<String,Class<?>> getClassCache() {
+        Map<String,Class<?>> c = classCache.get();
+        if (c == null) {
+            c = new HashMap<String,Class<?>>();
+            classCache.set(c);
+        }
+             
+        return c;
+    }
+    
     @SuppressWarnings("rawtypes")
     public static Class resolveClassName(String name) throws IOException{
+        Map<String,Class<?>> cache = getClassCache(); 
+        
+        Class c = cache.get(name);
+        if (c != null) {
+            return c;
+        }
+        
         for(String prefix: getPackageImportList()) {
-            Class c;
             try {
                 c = Class.forName(prefix+name,true, PigContext.classloader);
+                cache.put(name, c);
+                
                 return c;
             }
             catch (ClassNotFoundException e) {

Modified: pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java Mon Feb 24 21:41:38 2014
@@ -36,4 +36,9 @@ public class PigImplConstants {
      * the set of disabled optimizer rules.
      */
     public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
+
+    /**
+     * Used by pig to indicate that current job has been converted to run in local mode
+     */
+    public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
 }

Modified: pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java Mon Feb 24 21:41:38 2014
@@ -39,8 +39,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Shell;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -51,6 +54,7 @@ import org.apache.pig.backend.datastorag
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDirectory;
 import org.apache.pig.backend.hadoop.datastorage.HPath;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
@@ -64,11 +68,14 @@ public class FileLocalizer {
     static public final int STYLE_UNIX = 0;
     static public final int STYLE_WINDOWS = 1;
 
+    public static FsPermission OWNER_ONLY_PERMS = new FsPermission(FsAction.ALL, FsAction.NONE,
+            FsAction.NONE); // rwx------
+
     public static class DataStorageInputStreamIterator extends InputStream {
         InputStream current;
         ElementDescriptor[] elements;
         int currentElement;
-        
+
         public DataStorageInputStreamIterator(ElementDescriptor[] elements) {
             this.elements = elements;
         }
@@ -164,7 +171,7 @@ public class FileLocalizer {
             throw new RuntimeException(
                     "can't open DFS file while executing locally");
         }
-        
+
         return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
 
     }
@@ -174,7 +181,7 @@ public class FileLocalizer {
         ElementDescriptor elem = dds.asElement(fileName);
         return openDFSFile(elem);
     }
-    
+
     public static long getSize(String fileName) throws IOException {
     	Configuration conf = PigMapReduce.sJobConfInternal.get();
 
@@ -185,28 +192,28 @@ public class FileLocalizer {
 
         return getSize(fileName, ConfigurationUtil.toProperties(conf));
     }
-    
+
     public static long getSize(String fileName, Properties properties) throws IOException {
     	DataStorage dds = new HDataStorage(properties);
         ElementDescriptor elem = dds.asElement(fileName);
-       
+
         // recursively get all the files under this path
         ElementDescriptor[] allElems = getFileElementDescriptors(elem);
-        
+
         long size = 0;
-        
+
         // add up the sizes of all files found
         for (int i=0; i<allElems.length; i++) {
         	Map<String, Object> stats = allElems[i].getStatistics();
         	size += (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
         }
-        
+
         return size;
     }
-    
+
     private static InputStream openDFSFile(ElementDescriptor elem) throws IOException{
         ElementDescriptor[] elements = null;
-        
+
         if (elem.exists()) {
             try {
                 if(! elem.getDataStorage().isContainer(elem.toString())) {
@@ -218,7 +225,7 @@ public class FileLocalizer {
             catch (DataStorageException e) {
                 throw new IOException("Failed to determine if elem=" + elem + " is container", e);
             }
-            
+
             // elem is a directory - recursively get all files in it
             elements = getFileElementDescriptors(elem);
         } else {
@@ -226,15 +233,15 @@ public class FileLocalizer {
             if (!globMatchesFiles(elem, elem.getDataStorage())) {
                 throw new IOException(elem.toString() + " does not exist");
             } else {
-                elements = getFileElementDescriptors(elem); 
+                elements = getFileElementDescriptors(elem);
                 return new DataStorageInputStreamIterator(elements);
-                
+
             }
         }
-        
+
         return new DataStorageInputStreamIterator(elements);
     }
-    
+
     /**
      * recursively get all "File" element descriptors present in the input element descriptor
      * @param elem input element descriptor
@@ -259,7 +266,7 @@ public class FileLocalizer {
             if (fullPath.systemElement()) {
                 continue;
             }
-            
+
             if (fullPath instanceof ContainerDescriptor) {
                 for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
                     paths.add(child);
@@ -274,7 +281,7 @@ public class FileLocalizer {
         filePaths.toArray(elems);
         return elems;
     }
-    
+
     private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
         // IMPORTANT NOTE: Currently we use HXXX classes to represent
         // files and dirs in local mode - so we can just delegate this
@@ -282,7 +289,7 @@ public class FileLocalizer {
         // and dirs THIS WILL NEED TO CHANGE
         return openDFSFile(elem);
     }
-    
+
     /**
      * This function returns an input stream to a local file system file or
      * a file residing on Hadoop's DFS
@@ -306,7 +313,7 @@ public class FileLocalizer {
             return openLFSFile(elem);
         }
     }
-    
+
     /**
      * @deprecated Use {@link #fullPath(String, PigContext)} instead
      */
@@ -317,7 +324,7 @@ public class FileLocalizer {
             if (fileName.charAt(0) != '/') {
                 ElementDescriptor currentDir = storage.getActiveContainer();
                 ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
-                
+
                 fullPath = elem.toString();
             } else {
                 fullPath = fileName;
@@ -327,7 +334,7 @@ public class FileLocalizer {
         }
         return fullPath;
     }
-    
+
     static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         if (!fileSpec.startsWith(LOCAL_PREFIX)) {
@@ -342,35 +349,35 @@ public class FileLocalizer {
             return openLFSFile(elem);
         }
     }
-    
+
     /**
      * @param fileSpec
      * @param offset
      * @param pigContext
      * @return SeekableInputStream
      * @throws IOException
-     * 
+     *
      * This is an overloaded version of open where there is a need to seek in stream. Currently seek is supported
      * only in file, not in directory or glob.
      */
     static public SeekableInputStream open(String fileSpec, long offset, PigContext pigContext) throws IOException {
-        
+
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
-        
+
         ElementDescriptor elem;
-        if (!fileSpec.startsWith(LOCAL_PREFIX)) 
+        if (!fileSpec.startsWith(LOCAL_PREFIX))
             elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
-                
+
         else{
             fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
-            elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));            
+            elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
         }
-        
+
         if (elem.exists() && (!elem.getDataStorage().isContainer(elem.toString()))) {
             try {
                 if (elem.systemElement())
                     throw new IOException ("Attempt is made to open system file " + elem.toString());
-                
+
                 SeekableInputStream sis = elem.sopen();
                 sis.seek(offset, FLAGS.SEEK_SET);
                 return sis;
@@ -383,7 +390,7 @@ public class FileLocalizer {
         else
             throw new IOException("Currently seek is supported only in a file, not in glob or directory.");
     }
-    
+
     static public OutputStream create(String fileSpec, PigContext pigContext) throws IOException{
         return create(fileSpec,false,pigContext);
     }
@@ -404,11 +411,11 @@ public class FileLocalizer {
                 if (!res)
                     log.warn("FileLocalizer.create: failed to create " + f);
             }
-            
+
             return new FileOutputStream(fileSpec,append);
         }
     }
-    
+
     static public boolean delete(String fileSpec, PigContext pigContext) throws IOException{
         fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
         ElementDescriptor elem = null;
@@ -455,13 +462,28 @@ public class FileLocalizer {
             throws DataStorageException {
 
         if (relativeRoot.get() == null) {
-            String tdir= pigContext.getProperties().getProperty("pig.temp.dir", "/tmp");
-            relativeRoot.set(pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt()));
+            String tdir= pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR, "/tmp");
+            ContainerDescriptor relative = pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt());
+            relativeRoot.set(relative);
+            try {
+                if (!relative.exists()) {
+                    createRelativeRoot(relative);
+                }
+            } catch (IOException e) {
+                throw new DataStorageException(e);
+            }
         }
 
         return relativeRoot.get();
     }
 
+    private static void createRelativeRoot(ContainerDescriptor relativeRoot) throws IOException {
+        relativeRoot.create();
+        if (relativeRoot instanceof HDirectory) {
+            ((HDirectory) relativeRoot).setPermission(OWNER_ONLY_PERMS);
+        }
+    }
+
     public static void deleteTempFiles() {
         if (relativeRoot.get() != null) {
             try {
@@ -480,9 +502,6 @@ public class FileLocalizer {
     public static Path getTemporaryPath(PigContext pigContext, String suffix) throws IOException {
       ElementDescriptor relative = relativeRoot(pigContext);
 
-      if (!relativeRoot(pigContext).exists()) {
-          relativeRoot(pigContext).create();
-      }
       ElementDescriptor elem=
           pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt() + suffix);
       return ((HPath)elem).getPath();
@@ -495,30 +514,30 @@ public class FileLocalizer {
         if (filename.startsWith(LOCAL_PREFIX)) {
             filename = filename.substring(LOCAL_PREFIX.length());
         }
-        
+
         ElementDescriptor localElem =
             pigContext.getLfs().asElement(filename);
-            
+
         if (!localElem.exists()) {
             throw new FileNotFoundException(filename);
         }
-            
+
         ElementDescriptor distribElem = pigContext.getDfs().asElement(
                 getTemporaryPath(pigContext).toString());
-    
+
         int suffixStart = filename.lastIndexOf('.');
         if (suffixStart != -1) {
             distribElem = pigContext.getDfs().asElement(distribElem.toString() +
                     filename.substring(suffixStart));
         }
-            
+
         // TODO: currently the copy method in Data Storage does not allow to specify overwrite
         //       so the work around is to delete the dst file first, if it exists
         if (distribElem.exists()) {
             distribElem.delete();
         }
         localElem.copy(distribElem, null, false);
-            
+
         return distribElem.toString();
     }
 
@@ -528,7 +547,7 @@ public class FileLocalizer {
                 ElementDescriptor currentDir = pigContext.getDfs().getActiveContainer();
                 ElementDescriptor elem = pigContext.getDfs().asElement(currentDir.toString(),
                                                                                   filename);
-                
+
                 return elem.toString();
             }
             return filename;
@@ -546,7 +565,7 @@ public class FileLocalizer {
     /**
      * @deprecated Use {@link #fileExists(String, PigContext)} instead
      */
-    @Deprecated 
+    @Deprecated
     public static boolean fileExists(String filename, DataStorage store)
             throws IOException {
         ElementDescriptor elem = store.asElement(filename);
@@ -559,7 +578,7 @@ public class FileLocalizer {
     }
 
     /**
-     * @deprecated Use {@link #isFile(String, PigContext)} instead 
+     * @deprecated Use {@link #isFile(String, PigContext)} instead
      */
     @Deprecated
     public static boolean isFile(String filename, DataStorage store)
@@ -593,10 +612,10 @@ public class FileLocalizer {
             switch (elems.length) {
             case 0:
                 return false;
-    
+
             case 1:
                 return !elems[0].equals(elem);
-    
+
             default:
                 return true;
             }
@@ -614,13 +633,13 @@ public class FileLocalizer {
     /**
      * Convert path from Windows convention to Unix convention. Invoked under
      * cygwin.
-     * 
+     *
      * @param path
      *            path in Windows convention
      * @return path in Unix convention, null if fail
      */
     static public String parseCygPath(String path, int style) {
-        String[] command; 
+        String[] command;
         if (style==STYLE_WINDOWS)
             command = new String[] { "cygpath", "-w", path };
         else
@@ -653,7 +672,7 @@ public class FileLocalizer {
         }
         return line;
     }
-    
+
     static File localTempDir = null;
     static {
         File f;
@@ -669,8 +688,8 @@ public class FileLocalizer {
         if (!success) {
           throw new RuntimeException("Error creating FileLocalizer temp directory.");
         }
-    }    
-    
+    }
+
     public static class FetchFileRet {
         public FetchFileRet(File file, boolean didFetch) {
             this.file = file;
@@ -681,9 +700,9 @@ public class FileLocalizer {
     }
 
     /**
-     * Ensures that the passed path is on the local file system, fetching it 
+     * Ensures that the passed path is on the local file system, fetching it
      * to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true
-     * and dfs is not null, then a relative path is assumed to be relative to the passed 
+     * and dfs is not null, then a relative path is assumed to be relative to the passed
      * dfs active directory. Else they are assumed to be relative to the local working
      * directory.
      */
@@ -776,21 +795,21 @@ public class FileLocalizer {
 
         return fetchFiles;
     }
-    
+
     /**
      * Ensures that the passed resource is available from the local file system, fetching
      * it to a temporary directory.
-     * 
-     * @throws ResourceNotFoundException 
+     *
+     * @throws ResourceNotFoundException
      */
     public static FetchFileRet fetchResource(String name) throws IOException, ResourceNotFoundException {
       FetchFileRet localFileRet = null;
       InputStream resourceStream = PigContext.getClassLoader().getResourceAsStream(name);
-      if (resourceStream != null) {        
+      if (resourceStream != null) {
         File dest = new File(localTempDir, name);
-        dest.getParentFile().mkdirs();        
+        dest.getParentFile().mkdirs();
         dest.deleteOnExit();
-                
+
         OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest));
         byte[] buffer = new byte[1024];
         int len;
@@ -798,14 +817,14 @@ public class FileLocalizer {
           outputStream.write(buffer,0,len);
         }
         outputStream.close();
-        
+
         localFileRet = new FetchFileRet(dest,false);
       }
       else
       {
         throw new ResourceNotFoundException(name);
       }
-      
+
       return localFileRet;
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java Mon Feb 24 21:41:38 2014
@@ -205,6 +205,8 @@ StoreFuncInterface, LoadMetadata {
 
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
+        Configuration conf = job.getConfiguration();
+        Utils.setMapredCompressionCodecProps(conf);
         FileOutputFormat.setOutputPath(job, new Path(location));
     }
 

Modified: pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java Mon Feb 24 21:41:38 2014
@@ -26,11 +26,11 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.pig.PigWarning;
 /***
- * This class is used for collecting all messages (error + warning) in 
- * compilation process. These messages are reported back to users 
+ * This class is used for collecting all messages (error + warning) in
+ * compilation process. These messages are reported back to users
  * at the end of compilation.
- * 
- * iterator() has to be called after CompilationMessageCollector is fully 
+ *
+ * iterator() has to be called after CompilationMessageCollector is fully
  * populated otherwise the state is undefined.
  */
 public class CompilationMessageCollector implements Iterable<CompilationMessageCollector.Message> {
@@ -41,49 +41,57 @@ public class CompilationMessageCollector
         Warning,
         Info
     }
-    
+
+    public enum Unknown {
+       UNKNOWN_MESSAGE_KIND;
+       public String toString() {
+         return "Aggregated unknown kind messages.  Please set -Daggregate.warning=false to retrieve these messages";
+       }
+    }
+
     public static class Message {
         private String msg = null ;
         private MessageType msgType = MessageType.Unknown ;
         private Enum kind = null;
-        
+
         public Message(String message, MessageType messageType) {
             msg = message ;
             msgType = messageType ;
         }
-        
+
         public Message(String message, MessageType messageType, Enum kind) {
-        	this(message, messageType);
-        	this.kind = kind;
+            this(message, messageType);
+            this.kind = kind;
         }
-        
+
         public String getMessage() {
             return msg ;
         }
-        
+
         public MessageType getMessageType() {
             return msgType ;
         }
-        
+
         public Enum getKind() {
-        	return kind;
+            return kind;
         }
     }
-    
+
     private List<Message> messageList = new ArrayList<Message>() ;
-    
+
     public CompilationMessageCollector() {
         // nothing here
     }
-    
+
     public void collect(String message, MessageType messageType) {
-        messageList.add(new Message(message, messageType)) ;
+        messageList.add(new Message(message, messageType,
+                                    Unknown.UNKNOWN_MESSAGE_KIND)) ;
     }
-    
+
     public void collect(String message, MessageType messageType, Enum kind) {
         messageList.add(new Message(message, messageType, kind)) ;
     }
-    
+
     protected boolean hasMessageType(MessageType messageType) {
         Iterator<Message> iter = iterator() ;
         while(iter.hasNext()) {
@@ -93,48 +101,48 @@ public class CompilationMessageCollector
         }
         return false ;
     }
-    
+
     public boolean hasError() {
-    	return hasMessageType(MessageType.Error);
+        return hasMessageType(MessageType.Error);
     }
 
     public Iterator<Message> iterator() {
         return messageList.iterator() ;
     }
-    
+
     public boolean hasMessage() {
         return messageList.size() > 0 ;
     }
-    
+
     public int size() {
         return messageList.size() ;
     }
-    
+
     public Message get(int i) {
         return messageList.get(i) ;
     }
-    
+
     public Map<Enum, Long> getKindAggregate(MessageType messageType) {
-    	Map<Enum, Long> aggMap = new HashMap<Enum, Long>();
+        Map<Enum, Long> aggMap = new HashMap<Enum, Long>();
         Iterator<Message> iter = iterator() ;
         while(iter.hasNext()) {
-        	Message message = iter.next(); 
+            Message message = iter.next();
             if (message.getMessageType() == messageType) {
-            	Enum kind = message.getKind();
-            	if(kind != null) {
-            		Long count = aggMap.get(kind);
-            		count = (count == null? 1 : ++count);
-            		aggMap.put(kind, count);
-            	}
+                Enum kind = message.getKind();
+                if(kind != null) {
+                    Long count = aggMap.get(kind);
+                    count = (count == null? 1 : ++count);
+                    aggMap.put(kind, count);
+              }
             }
-    	}
-    	return aggMap;
+        }
+        return aggMap;
     }
-    
+
     public static void logAggregate(Map<Enum, Long> aggMap, MessageType messageType, Log log) {
         long nullCounterCount = aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
         if (nullCounterCount!=0 && aggMap.size()>1) // PigWarning.NULL_COUNTER_COUNT is definitely in appMap
-            logMessage("Unable to retrieve hadoop counter for " + nullCounterCount + 
+            logMessage("Unable to retrieve hadoop counter for " + nullCounterCount +
                     " jobs, the number following warnings may not be correct", messageType, log);
         for(Map.Entry<Enum, Long> e: aggMap.entrySet()) {
             if (e.getKey() !=PigWarning.NULL_COUNTER_COUNT)
@@ -145,52 +153,52 @@ public class CompilationMessageCollector
                     logMessage(message, messageType, log);
                 }
             }
-    	}	
+        }
     }
-    
-    public static void logMessages(CompilationMessageCollector messageCollector, 
-    		MessageType messageType, boolean aggregate, Log log) {
-    	if(aggregate) {
-    		Map<Enum, Long> aggMap = messageCollector.getKindAggregate(messageType);
-    		logAggregate(aggMap, messageType, log);
-    	} else {
-    		Iterator<Message> messageIter = messageCollector.iterator();
-    		while(messageIter.hasNext()) {
-    			Message message = messageIter.next();
-    			if(message.getMessageType() == messageType) {
-    				logMessage(message.getMessage(), messageType, log);
-    			}
-    		}
-    	}
+
+    public static void logMessages(CompilationMessageCollector messageCollector,
+            MessageType messageType, boolean aggregate, Log log) {
+        if(aggregate) {
+            Map<Enum, Long> aggMap = messageCollector.getKindAggregate(messageType);
+            logAggregate(aggMap, messageType, log);
+        } else {
+            Iterator<Message> messageIter = messageCollector.iterator();
+            while(messageIter.hasNext()) {
+                Message message = messageIter.next();
+                if(message.getMessageType() == messageType) {
+                    logMessage(message.getMessage(), messageType, log);
+                }
+            }
+        }
     }
-    
+
     public void logMessages(MessageType messageType, boolean aggregate, Log log) {
-    	logMessages(this, messageType, aggregate, log);
+        logMessages(this, messageType, aggregate, log);
     }
-    
+
     public static void logAllMessages(CompilationMessageCollector messageCollector, Log log) {
-    	Iterator<Message> messageIter = messageCollector.iterator();
-		while(messageIter.hasNext()) {
-			Message message = messageIter.next();
-			logMessage(message.getMessage(), message.getMessageType(), log);
-		}
+        Iterator<Message> messageIter = messageCollector.iterator();
+        while(messageIter.hasNext()) {
+            Message message = messageIter.next();
+            logMessage(message.getMessage(), message.getMessageType(), log);
+        }
     }
-    
+
     public void logAllMessages(Log log) {
-    	logAllMessages(this, log);
+        logAllMessages(this, log);
     }
 
     private static void logMessage(String messageString, MessageType messageType, Log log) {
-		switch(messageType) {
-		case Info:
-			log.info(messageString);
-			break;
-		case Warning:
-			log.warn(messageString);
-			break;
-		case Error:
-			log.error(messageString);
-		}
+        switch(messageType) {
+        case Info:
+            log.info(messageString);
+            break;
+        case Warning:
+            log.warn(messageString);
+            break;
+        case Error:
+            log.error(messageString);
+        }
     }
-    
+
 }

Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Feb 24 21:41:38 2014
@@ -343,6 +343,8 @@ public class JarManager {
      */
     private static void addContainingJar(Vector<JarListEntry> jarList, Class clazz, String prefix, PigContext pigContext) {
         String jar = findContainingJar(clazz);
+        if (pigContext.predeployedJars.contains(jar))
+            return;
         if (pigContext.skipJars.contains(jar) && prefix == null)
             return;
         if (jar == null)

Modified: pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java Mon Feb 24 21:41:38 2014
@@ -29,6 +29,8 @@ import java.util.zip.DeflaterOutputStrea
 import java.util.zip.InflaterInputStream;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 
 public class ObjectSerializer {
 
@@ -51,12 +53,15 @@ public class ObjectSerializer {
     public static Object deserialize(String str) throws IOException {
         if (str == null || str.length() == 0)
             return null;
+        ObjectInputStream objStream = null;
         try {
             ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str));
-            ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj));
+            objStream = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), new InflaterInputStream(serialObj));
             return objStream.readObject();
         } catch (Exception e) {
             throw new IOException("Deserialization error: " + e.getMessage(), e);
+        } finally {
+            IOUtils.closeQuietly(objStream);
         }
     }
 

Modified: pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java Mon Feb 24 21:41:38 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 
 public class PropertiesUtil {
     private static final String DEFAULT_PROPERTIES_FILE = "/pig-default.properties";
@@ -143,6 +144,11 @@ public class PropertiesUtil {
             //by default we keep going on error on the backend
             properties.setProperty("stop.on.failure", ""+false);
         }
+
+        if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+            //by default fetch optimization is on
+            properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+        }
     }
     
     /**

Modified: pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/Utils.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/Utils.java Mon Feb 24 21:41:38 2014
@@ -26,9 +26,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.SequenceInputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketImplFactory;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
@@ -340,7 +337,7 @@ public class Utils {
     }
 
     public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
-        Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorage(ConfigurationUtil.toProperties(conf)).getStorageClass();
+        Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorageClass(ConfigurationUtil.toProperties(conf));
         try {
             return storageClass.newInstance();
         } catch (InstantiationException e) {
@@ -350,6 +347,10 @@ public class Utils {
         }
     }
 
+    public static Class<? extends FileInputLoadFunc> getTmpFileStorageClass(Properties properties) {
+       return getTmpFileStorage(properties).getStorageClass();
+    }
+
     private static TEMPFILE_STORAGE getTmpFileStorage(Properties properties) {
         boolean tmpFileCompression = properties.getProperty(
                 PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "false").equals("true");
@@ -369,7 +370,20 @@ public class Utils {
         }
     }
 
+    public static void setMapredCompressionCodecProps(Configuration conf) {
+        String codec = conf.get(
+                PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
+        if ("".equals(codec) && conf.get("mapred.output.compression.codec") != null) {
+            conf.setBoolean("mapred.output.compress", true);
+        } else if(TEMPFILE_STORAGE.SEQFILE.ensureCodecSupported(codec)) {
+            conf.setBoolean("mapred.output.compress", true);
+            conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+        }
+        // no codec specified
+    }
+
     public static void setTmpFileCompressionOnConf(PigContext pigContext, Configuration conf) throws IOException{
+        // PIG-3741 This is also called for non-intermediate jobs, do not set any mapred properties here
         if (pigContext == null) {
             return;
         }
@@ -380,7 +394,6 @@ public class Utils {
         case INTER:
             break;
         case SEQFILE:
-            conf.setBoolean("mapred.output.compress", true);
             conf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "seqfile");
             if("".equals(codec)) {
                 // codec is not specified, ensure  is set
@@ -389,7 +402,7 @@ public class Utils {
                     throw new IOException("mapred.output.compression.codec is not set");
                 }
             } else if(storage.ensureCodecSupported(codec)) {
-                conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+                // do nothing
             } else {
                 throw new IOException("Invalid temporary file compression codec [" + codec + "]. " +
                         "Expected compression codecs for " + storage.getStorageClass().getName() + " are " + storage.supportedCodecsToString() + ".");

Modified: pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java Mon Feb 24 21:41:38 2014
@@ -180,23 +180,35 @@ public class FilterExtractor {
     }
 
     private LogicalExpression andLogicalExpressions(
-            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) throws FrontendException {
         if (a == null) {
             return b;
         }
         if (b == null) {
             return a;
         }
+        if (!plan.ops.contains(a)) {
+            a = a.deepCopy(plan);
+        }
+        if (!plan.ops.contains(b)) {
+            b = b.deepCopy(plan);
+        }
         LogicalExpression andOp = new AndExpression(plan, a, b);
         return andOp;
     }
 
     private LogicalExpression orLogicalExpressions(
-            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+            LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) throws FrontendException {
         // Or 2 operators if they are not null
         if (a == null || b == null) {
             return null;
         }
+        if (!plan.ops.contains(a)) {
+            a = a.deepCopy(plan);
+        }
+        if (!plan.ops.contains(b)) {
+            b = b.deepCopy(plan);
+        }
         LogicalExpression orOp = new OrExpression(plan, a, b);
         return orOp;
     }
@@ -234,7 +246,7 @@ public class FilterExtractor {
             //              AND (leftState.filterExpr OR rightState.pushdownExpr)
             //              AND (leftState.filterExpr OR rightState.filterExpr)
             state.pushdownExpr = orLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
-            if(state.pushdownExpr == null) {
+            if (state.pushdownExpr == null) {
                 // Whatever we did so far on the right tree is all wasted :(
                 // Undo all the mutation (AND OR distributions) until now
                 removeFromFilteredPlan(leftState.filterExpr);

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Feb 24 21:41:38 2014
@@ -506,6 +506,10 @@ public class ExpToPhyTranslationVisitor 
                     .getNextNodeId(DEFAULT_SCOPE)), -1,
                     null, op.getFuncSpec(), (EvalFunc) f);
             ((POUserFunc)p).setSignature(op.getSignature());
+            //reinitialize input schema from signature
+            if (((POUserFunc)p).getFunc().getInputSchema() == null) {
+                ((POUserFunc)p).setFuncInputSchema(op.getSignature());
+            }
             List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
             if (cacheFiles != null) {
                 ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java Mon Feb 24 21:41:38 2014
@@ -77,44 +77,25 @@ public class LOForEach extends LogicalRe
     }
     
     // Find the LOInnerLoad of the inner plan corresponding to the project, and 
-    // also find whether there is a LOForEach in inner plan along the way
+    // also find whether there is a relational operator in inner plan along the way
     public static Pair<List<LOInnerLoad>, Boolean> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws FrontendException {
         boolean needNewUid = false;
-        LogicalRelationalOperator referred = project.findReferent();
-        // If it is nested foreach, generate new uid
-        if (referred instanceof LOForEach)
-            needNewUid = true;
-        List<Operator> srcs = referred.getPlan().getSources();
         List<LOInnerLoad> innerLoads = new ArrayList<LOInnerLoad>();
-        for (Operator src:srcs) {
-            if (src instanceof LOInnerLoad) {
-            	if( src == referred ) {
-            		innerLoads.add( (LOInnerLoad)src );
-            		continue;
-            	}
-            	
-            	Deque<Operator> stack = new LinkedList<Operator>();
-                List<Operator> succs = referred.getPlan().getSuccessors( src );
-                if( succs != null ) {
-                	for( Operator succ : succs ) {
-                		stack.push( succ );
-                	}
-                }
-                
-                while( !stack.isEmpty() ) {
-                	Operator op = stack.pop();
-                    if( op == referred ) {
-                        innerLoads.add((LOInnerLoad)src);
-                        break;
-                    }
-                    else {
-                    	List<Operator> ops = referred.getPlan().getSuccessors( op );
-                    	if( ops != null ) {
-                        	for( Operator o : ops ) {
-                        		stack.push( o );
-                        	}
-                    	}
-                    }
+        LogicalRelationalOperator referred = project.findReferent();
+        Deque<Operator> stack = new LinkedList<Operator>();
+        stack.add(referred);
+        while( !stack.isEmpty() ) {
+            Operator op = stack.pop();
+            if (op instanceof LOInnerLoad) {
+                innerLoads.add((LOInnerLoad)op);
+            }
+            else if (!(op instanceof LOGenerate)) {
+                needNewUid = true;
+            }
+            List<Operator> ops = referred.getPlan().getPredecessors( op );
+            if( ops != null ) {
+                for( Operator o : ops ) {
+                    stack.push( o );
                 }
             }
         }

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Feb 24 21:41:38 2014
@@ -144,11 +144,12 @@ public class LOGenerate extends LogicalR
                         fs.stampFieldSchema();
                         mergedSchema.addField(new LogicalFieldSchema(fs));
                     }
-                    if(mergedSchema.size() == 1 && mergedSchema.getField(0).type == DataType.NULL){
-                        //this is the use case where a new alias has been specified by user
-                        mergedSchema.getField(0).type = DataType.BYTEARRAY;
+                    for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+                        if (fs.type == DataType.NULL){
+                            //this is the use case where a new alias has been specified by user
+                            fs.type = DataType.BYTEARRAY;
+                        }
                     }
-                
                 } else {
 
                     // Merge uid with the exp field schema

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java Mon Feb 24 21:41:38 2014
@@ -31,8 +31,8 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.newplan.ReverseDependencyOrderWalker;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
 
 public class LOSort extends LogicalRelationalOperator{
     private List<Boolean> mAscCols;
@@ -200,23 +200,6 @@ public class LOSort extends LogicalRelat
         return plan.getPredecessors(this).get(0);
     }
 
-    private static class ResetProjectionAttachedRelationalOpVisitor
-        extends LogicalExpressionVisitor {
-        private LogicalRelationalOperator attachedRelationalOp;
-
-        ResetProjectionAttachedRelationalOpVisitor (
-            LogicalExpressionPlan plan, LogicalRelationalOperator op )
-            throws FrontendException {
-            super(plan, new ReverseDependencyOrderWalker(plan));
-            this.attachedRelationalOp = op;
-
-        }
-        @Override
-        public void visit(ProjectExpression pe) throws FrontendException {
-            pe.setAttachedRelationalOp(attachedRelationalOp);
-        }
-    }
-
     public static LOSort createCopy(LOSort sort) throws FrontendException {
         LOSort newSort = new LOSort(sort.getPlan(), null,
                                     sort.getAscendingCols(),

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Mon Feb 24 21:41:38 2014
@@ -19,7 +19,9 @@ package org.apache.pig.newplan.logical.r
 
 import java.io.IOException;
 
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.OverwritableStoreFunc;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -91,8 +93,22 @@ public class InputOutputFileValidator {
                     errCode = 4000;
                     break;
                 }
-                validationErrStr  += ioe.getMessage();
-                throw new VisitorException(store, validationErrStr, errCode, errSrc, ioe);
+                
+                boolean shouldThrowException = true;
+                if (sf instanceof OverwritableStoreFunc) {
+                    if (((OverwritableStoreFunc) sf).shouldOverwrite()) {
+                        if (ioe instanceof FileAlreadyExistsException
+                                || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+                            shouldThrowException = false;
+                        }
+                    }
+                }
+                if (shouldThrowException) {
+                    validationErrStr += ioe.getMessage();
+                    throw new VisitorException(store, validationErrStr,
+                            errCode, errSrc, ioe);
+                }
+
             } catch (InterruptedException ie) {
                 validationErrStr += ie.getMessage();
                 throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ie);

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Mon Feb 24 21:41:38 2014
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical.v
 import java.util.Map;
 
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigWarning;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -107,7 +108,8 @@ public class CastLineageSetter extends A
                     String msg = "Cannot resolve load function to use for casting from " + 
                                 DataType.findTypeName(inType) + " to " +
                                 DataType.findTypeName(outType) + ". ";
-                    msgCollector.collect(msg, MessageType.Warning);
+                    msgCollector.collect(msg, MessageType.Warning,
+                           PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
                 }else {
                     cast.setFuncSpec(inLoadFunc);
                 }

Modified: pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g Mon Feb 24 21:41:38 2014
@@ -85,11 +85,15 @@ query : ^( QUERY statement* )
 statement : general_statement
           | split_statement
           | realias_statement
+          | assert_statement
 ;
 
 split_statement : split_clause
 ;
 
+assert_statement: assert_clause
+;
+
 realias_statement : realias_clause
 ;
 
@@ -132,6 +136,7 @@ op_clause : define_clause
           | split_clause
           | foreach_clause
           | cube_clause
+          | assert_clause
 ;
 
 define_clause
@@ -299,6 +304,13 @@ store_clause
     : ^( STORE alias filename func_clause? )
 ;
 
+assert_clause
+    : ^( ASSERT alias cond comment? )
+;
+
+comment : QUOTEDSTRING
+;
+
 filter_clause
     : ^( FILTER rel cond )
 ;
@@ -695,6 +707,7 @@ eid : rel_str_op
     | TOBAG
     | TOMAP
     | TOTUPLE
+    | ASSERT
 ;
 
 // relational operator

Modified: pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g Mon Feb 24 21:41:38 2014
@@ -57,6 +57,7 @@ statement : general_statement
           | split_statement { sb.append(";\n"); }
           | import_statement { sb.append(";\n"); }
           | register_statement { sb.append(";\n"); }
+          | assert_statement { sb.append(";\n"); }
           | realias_statement
 ;
 
@@ -76,6 +77,9 @@ register_statement : ^( REGISTER QUOTEDS
                         } scripting_udf_clause? )
 ;
 
+assert_statement : assert_clause
+;
+
 scripting_udf_clause : scripting_language_clause scripting_namespace_clause
 ;
 
@@ -735,6 +739,7 @@ eid : rel_str_op
     | TOTUPLE    { sb.append($TOTUPLE.text); }
     | IN         { sb.append($IN.text); }
     | CASE       { sb.append($CASE.text); }
+    | ASSERT     { sb.append($ASSERT.text); }
 ;
 
 // relational operator

Modified: pig/branches/tez/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AstValidator.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AstValidator.g Mon Feb 24 21:41:38 2014
@@ -117,6 +117,7 @@ statement : general_statement
           | split_statement
           | realias_statement
           | register_statement
+          | assert_statement
 ;
 
 split_statement : split_clause
@@ -128,6 +129,9 @@ realias_statement : realias_clause
 register_statement : ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
 ;
 
+assert_statement : assert_clause
+;
+
 general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause parallel_clause? )
 ;
 
@@ -724,6 +728,7 @@ eid : rel_str_op
     | TOBAG
     | TOMAP
     | TOTUPLE
+    | ASSERT
 ;
 
 // relational operator

Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 24 21:41:38 2014
@@ -99,6 +99,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.rules.OptimizerUtils;
 import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
+import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
 
 public class LogicalPlanBuilder {
 
@@ -296,6 +297,18 @@ public class LogicalPlanBuilder {
         }
         // using De Morgan's law (!A && !B) == !(A || B)
         currentExpr = new NotExpression(splitPlan, currentExpr);
+
+        try {
+            // Going through all the ProjectExpressions that were cloned
+            // and updating the attached operators from its original
+            // LOSplitOutput to to the "otherwise" LOSplitOutput
+            // (PIG-3641)
+            new ResetProjectionAttachedRelationalOpVisitor(splitPlan, op).visit();
+        } catch (FrontendException e) {
+            e.printStackTrace();
+            throw new PlanGenerationFailureException(intStream, loc, e);
+        }
+
         op.setFilterPlan(splitPlan);
         return buildOp(loc, op, alias, inputAlias, null);
     }

Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g Mon Feb 24 21:41:38 2014
@@ -173,6 +173,7 @@ scope {
  : general_statement
  | split_statement
  | realias_statement
+ | assert_statement
  | register_statement
 ;
 
@@ -182,6 +183,9 @@ split_statement : split_clause
 realias_statement : realias_clause
 ;
 
+assert_statement : assert_clause
+;
+
 register_statement
 : ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
   {
@@ -2002,6 +2006,7 @@ eid returns[String id] : rel_str_op { $i
     | TOBAG { $id = "TOBAG"; }
     | TOMAP { $id = "TOMAP"; }
     | TOTUPLE { $id = "TOTUPLE"; }
+    | ASSERT { $id = "ASSERT"; } 
 ;
 
 // relational operator

Modified: pig/branches/tez/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/QueryParser.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/QueryParser.g Mon Feb 24 21:41:38 2014
@@ -224,6 +224,7 @@ statement : SEMI_COLON!
           | import_clause SEMI_COLON!
           | realias_clause SEMI_COLON!
           | register_clause SEMI_COLON!
+          | assert_clause SEMI_COLON!
           // semicolons after foreach_complex_statement are optional for backwards compatibility, but to keep
           // the grammar unambiguous if there is one then we'll parse it as a single, standalone semicolon
           // (which matches the first statement rule)
@@ -382,7 +383,6 @@ op_clause : define_clause
           | union_clause
           | stream_clause
           | mr_clause
-          | assert_clause
 ;
 
 ship_clause : SHIP^ LEFT_PAREN! path_list? RIGHT_PAREN!
@@ -1029,6 +1029,7 @@ eid_without_columns : rel_str_op
     | FULL
     | REALIAS
     | BOOL_COND
+    | ASSERT
 ;
 
 eid : eid_without_columns
@@ -1075,6 +1076,5 @@ reserved_identifier_whitelist : RANK
                               | THEN
                               | ELSE
                               | END
-                              | ASSERT
 ;
 

Modified: pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Mon Feb 24 21:41:38 2014
@@ -247,11 +247,12 @@ TOKEN_MGR_DECLS : {
 {
     <"split"> : PIG_START
 |   <"define"> : PIG_START
-|	<"store"> : PIG_START
-|	<"import"> : PIG_START
-| 	<(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
+|   <"store"> : PIG_START
+|   <"assert"> : PIG_START
+|   <"import"> : PIG_START
+|   <(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
 |   <"=>" (" " | "\t")*> : PIG_START
-| 	< <IDENTIFIER> (" " | "\t")* ("," (" " | "\t")* <IDENTIFIER> )* (" " | "\t")* "="> : PIG_START
+|   < <IDENTIFIER> (" " | "\t")* ("," (" " | "\t")* <IDENTIFIER> )* (" " | "\t")* "="> : PIG_START
 |   < <IDENTIFIER> (" " | "\t")* "(" > : PIG_START
 }
 

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Feb 24 21:41:38 2014
@@ -142,15 +142,24 @@ public class PigStatsUtil {
 
 
     public static void setErrorMessage(String msg) {
-        PigStats.get().setErrorMessage(msg);
+        PigStats ps = PigStats.get();
+        if (ps != null) {
+            ps.setErrorMessage(msg);
+        }
     }
 
     public static void setErrorCode(int code) {
-        PigStats.get().setErrorCode(code);
+        PigStats ps = PigStats.get();
+        if (ps != null) {
+            ps.setErrorCode(code);
+        }
     }
 
     public static void setErrorThrowable(Throwable t) {
-        PigStats.get().setErrorThrowable(t);
+        PigStats ps = PigStats.get();
+        if (ps != null) {
+            ps.setErrorThrowable(t);
+        }
     }
 
     private static Pattern pattern = Pattern.compile("tmp(-)?[\\d]{1,10}$");

Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1554090-1571421

Modified: pig/branches/tez/test/e2e/pig/tests/cmdline.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/cmdline.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/cmdline.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/cmdline.conf Mon Feb 24 21:41:38 2014
@@ -68,6 +68,7 @@ describe A;\,
 #                        #JIRA[PIG-373]
 #			{
 #			'num' => 4,
+#			'java_params' => ['-Dopt.fetch=false'],
 #			'pig' => q\ 
 #A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray);
 #describe A;
@@ -240,6 +241,7 @@ describe D;\,
 		
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\ 
 A = load ':INPATH:/singlefile/unicode100' as (name:chararray);
 dump A;\,

Modified: pig/branches/tez/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/negative.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/negative.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/negative.conf Mon Feb 24 21:41:38 2014
@@ -34,6 +34,7 @@ $cfg = {
 		'tests' => [
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = group a by name;
@@ -48,6 +49,7 @@ dump c;\,
 		'tests' => [
 			{
 			'num' => 1,
+			'java_params' => ['-Dopt.fetch=false'],
 			'pig' => q\a = load '/user/gates/nosuchfile'; dump a;\,
 			'expected_err_regex' => "ERROR 2118: Input path does not exist",
 			},
@@ -248,6 +250,7 @@ store a into ':INPATH:/singlefile/fileex
                         {
 			# missing quotes around command
                         'num' => 1,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
 B = foreach A generate $2, $1, $0;
@@ -259,6 +262,7 @@ dump C;#,
                         {
 			# input spec missing parenthesis
                         'num' => 2,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` input 'foo' using PigStorage() ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -269,6 +273,7 @@ dump B;#,
                         {
 			# no serializer name after using
                         'num' => 3,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` output ('foo' using );
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -279,6 +284,7 @@ dump B;#,
                         {
 			# alias name missing from define
                         'num' => 4,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define `perl PigStreaming.pl foo -`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -289,6 +295,7 @@ dump B;#,
                         {
 			# quotes missing from name of the file in ship script
                         'num' => 5,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q#
 define CMD `perl PigStreaming.pl foo -` ship(:SCRIPTHOMEPATH:/PigStreaming.pl);
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -306,6 +313,7 @@ dump B;#,
 			# Define uses using non-existent command (autoship)
                         'num' => 1,
 			'execonly' => 'mapred',
+			'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -316,6 +324,7 @@ dump B;\,
                         {
 			# Define uses non-existent command with ship clause
                         'num' => 2,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl foo -` ship(':SCRIPTHOMEPATH:/PigStreamingNotThere.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -339,6 +348,7 @@ dump E;\,
                         {
 			# Define uses non-existent serializer
                         'num' => 4,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl foo -` input('foo' using SerializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -349,6 +359,7 @@ dump B;\,
                         {
 			# Define uses non-existent deserializer
                         'num' => 5,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl` output(stdout using DeserializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -359,6 +370,7 @@ dump B;\,
                         {
 			# Invalid skip path
                         'num' => 6,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 set stream.skippath 'foo';
 define CMD `perl PigStreaming.pl`;
@@ -370,6 +382,7 @@ dump B;\,
                         {
 			# Invalid command alias in stream operator
                         'num' => 7,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -380,6 +393,7 @@ dump B;\,
                         {
 			# Invalid operator alias in stream operator
                         'num' => 8,
+                        'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreaming.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';
@@ -483,6 +497,7 @@ store D into ':OUTPATH:';\,
 			# Define uses using non-existent command
                         'num' => 1,
 						'execonly' => 'local',
+						'java_params' => ['-Dopt.fetch=false'],
                         'pig' => q\
 define CMD `perl PigStreamingNotThere.pl`;
 A = load ':INPATH:/singlefile/studenttab10k';

Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Mon Feb 24 21:41:38 2014
@@ -2173,6 +2173,14 @@ describe A;
 store A into ':OUTPATH:';\,
 
 			},
+			{
+			'num' => 2,
+			'pig' => q\ 
+A = load 'sample' as (line:chararray);
+B = foreach A generate flatten(STRSPLIT(line)) as (i0, i1, i2);
+describe B;\,
+                        'expected_out_regex'  => 'B: {i0: bytearray,i1: bytearray,i2: bytearray}',
+			},
 		],
 		},
 		{
@@ -4499,9 +4507,9 @@ store C into ':OUTPATH:';\, 
                 },
                 {
                     # Test Union using merge with incompatible types.  float->bytearray and chararray->bytearray
-			        'num' => 8,
-			        'delimiter' => '	',
-			        'pig' => q\
+                    'num' => 8,
+                    'delimiter' => '	',
+                    'pig' => q\
 A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int);
 B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray);
 C = union onschema A, B;
@@ -4511,17 +4519,18 @@ A = load ':INPATH:/singlefile/studenttab
 B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
 C = union A, B;
 store C into ':OUTPATH:';\,
- 				}
-		]
+                }
+              ]
 
             },
-			{
+            {
 
-			# Test Union using merge with Simple data types
-	        'name' => 'UdfDistributedCache',
+            # Test Union using merge with Simple data types
+            'name' => 'UdfDistributedCache',
             'tests' => [
-            	{
-            		'num' => 1,
+                {
+                    'num' => 1,
+                    'java_params' => ['-Dopt.fetch=false'],
                     'execonly' => 'mapred', # since distributed cache is not supported in local mode
                     'pig' => q?
                         register :FUNCPATH:/testudf.jar;
@@ -4531,8 +4540,8 @@ store C into ':OUTPATH:';\,
                         c = foreach b generate udfdc(age);
                         dump c;?,
                     'expected_out_regex' => ":UdfDistributedCache_1_out:",
-            	},
-	        ]
+                },
+              ]
             }, {
                 'name' => 'MonitoredUDF',
                 'tests' => [
@@ -4793,6 +4802,24 @@ store C into ':OUTPATH:';\,
                                 I = limit H 3;
                                 J = foreach I generate contributions;
                                 STORE J INTO ':OUTPATH:.2';?,
+                    }, {
+                        # PIG-3641
+                        'num' => 6,
+                        'pig' => q?A = LOAD ':INPATH:/singlefile/votertab10k' AS (name, age, registration, contributions);
+                                -- dropping one column to force columnprune
+                                B = foreach A generate name, age, registration;
+                                -- Next line is the only difference
+                                SPLIT B into C1 if age > 50, C2 otherwise;
+                                D1 = foreach C1 generate age, registration;
+                                STORE D1 INTO ':OUTPATH:.1';
+                                STORE C2 INTO ':OUTPATH:.2';?,
+                        'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/votertab10k' AS (name, age, registration, contributions);
+                                -- dropping one column to force columnprune
+                                B = foreach A generate name, age, registration;
+                                SPLIT B into C1 if age > 50, C2 if age <= 50;
+                                D1 = foreach C1 generate age, registration;
+                                STORE D1 INTO ':OUTPATH:.1';
+                                STORE C2 INTO ':OUTPATH:.2';?,
                     }
                 ],
             },{