You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [11/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Fri Feb 24 08:19:42 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-  
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
  * stored unsorted as it comes in, and only sorted when it is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
- * 
+ *
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
 public class SortedDataBag extends DefaultAbstractBag{
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
 
         @Override
         public int hashCode() {
-            return 42; 
+            return 42;
         }
 
     }
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
     public boolean isSorted() {
         return true;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new SortedDataBagIterator();
@@ -145,12 +145,15 @@ public class SortedDataBag extends Defau
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -161,7 +164,6 @@ public class SortedDataBag extends Defau
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -203,7 +205,7 @@ public class SortedDataBag extends Defau
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -228,7 +230,7 @@ public class SortedDataBag extends Defau
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -341,7 +343,7 @@ public class SortedDataBag extends Defau
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -351,7 +353,7 @@ public class SortedDataBag extends Defau
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        String msg = "Unable to find our spill file."; 
+                        String msg = "Unable to find our spill file.";
                         log.fatal(msg, fnfe);
                         throw new RuntimeException(msg, fnfe);
                     }
@@ -411,7 +413,7 @@ public class SortedDataBag extends Defau
                         in.close();
                     }catch(IOException e) {
                         log.warn("Failed to close spill file.", e);
-                    }                	
+                    }
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -518,7 +520,7 @@ public class SortedDataBag extends Defau
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();

Modified: pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java Fri Feb 24 08:19:42 2017
@@ -29,7 +29,7 @@ import org.apache.pig.classification.Int
 
 /**
  * Common functionality for proactively spilling bags that need to keep the data
- * sorted. 
+ * sorted.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext
         //count for number of objects that have spilled
         if(mSpillFiles == null)
             incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
-        
+
         long spilled = 0;
-        
+
         DataOutputStream out = null;
         try {
             out = getSpillFile();
@@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext
             //sort the tuples
             // as per documentation of collection.sort(), it copies to an array,
             // sorts and copies back to collection
-            // Avoiding that extra copy back to collection (mContents) by 
+            // Avoiding that extra copy back to collection (mContents) by
             // copying to an array and using Arrays.sort
             Tuple[] array = new Tuple[mContents.size()];
             mContents.toArray(array);
             if(comp == null)
                 Arrays.sort(array);
-            else 
+            else
                 Arrays.sort(array,comp);
 
             //dump the array
@@ -89,12 +89,15 @@ public abstract class SortedSpillBag ext
             }
 
             out.flush();
-        } catch (IOException ioe) {
+            out.close();
+            out = null;
+            mContents.clear();
+        } catch (Throwable e) {
             // Remove the last file from the spilled array, since we failed to
             // write to it.
             mSpillFiles.remove(mSpillFiles.size() - 1);
             warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
             return 0;
         } finally {
             if (out != null) {
@@ -105,11 +108,9 @@ public abstract class SortedSpillBag ext
                 }
             }
         }
-        mContents.clear();
-        
         incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
-        
+
         return spilled;
     }
-    
+
 }

Modified: pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java Fri Feb 24 08:19:42 2017
@@ -28,7 +28,7 @@ public class UnlimitedNullTuple extends
 
     @Override
     public int size() {
-        throw new RuntimeException("Unimplemented");
+        return Integer.MAX_VALUE;
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java Fri Feb 24 08:19:42 2017
@@ -61,25 +61,25 @@ public class SedesHelper {
     public static void writeChararray(DataOutput out, String s) throws IOException {
         // a char can take up to 3 bytes in the modified utf8 encoding
         // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
-        if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) {
+        byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
+        int length = utfBytes.length;
+        if (length < BinInterSedes.UNSIGNED_SHORT_MAX) {
             out.writeByte(BinInterSedes.SMALLCHARARRAY);
-            out.writeUTF(s);
+            out.writeShort(length);
         } else {
-            byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
-            int length = utfBytes.length;
-
             out.writeByte(BinInterSedes.CHARARRAY);
             out.writeInt(length);
-            out.write(utfBytes);
         }
+        out.write(utfBytes);
     }
 
     public static String readChararray(DataInput in, byte type) throws IOException {
+        int size;
         if (type == BinInterSedes.SMALLCHARARRAY) {
-            return in.readUTF();
+            size = in.readUnsignedShort();
+        } else {
+            size = in.readInt();
         }
-
-        int size = in.readInt();
         byte[] buf = new byte[size];
         in.readFully(buf);
         return new String(buf, BinInterSedes.UTF8);

Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Fri Feb 24 08:19:42 2017
@@ -38,6 +38,12 @@ public class PigImplConstants {
     public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
 
     /**
+     * Used by pig to indicate that current job is running in local mode (local/tez_local)
+     * ie. ExecType.isLocal() is true
+     */
+    public static final String PIG_EXECTYPE_MODE_LOCAL = "pig.exectype.mode.local";
+
+    /**
      * Used by pig to indicate that current job has been converted to run in local mode
      */
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
@@ -63,4 +69,24 @@ public class PigImplConstants {
      * Parallelism to be used for CROSS operation by GFCross UDF
      */
     public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
+
+    /**
+     * Pig context
+     */
+    public static final String PIG_CONTEXT = "pig.pigContext";
+
+    /**
+     * Pig log4j properties
+     */
+    public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
+
+    /**
+     * A unique id for a Pig session used as callerId for underlining component
+     */
+    public static final String PIG_AUDIT_ID = "pig.script.id";
+
+    // Kill the jobs before cleaning up tmp files
+    public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+    public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+    public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Fri Feb 24 08:19:42 2017
@@ -64,13 +64,13 @@ import org.apache.pig.impl.util.ObjectSe
 public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
     private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class);
-    
+
     // FileSpec of index file which will be read from HDFS.
     private String indexFile;
     private String indexFileLoadFuncSpec;
-    
+
     private LoadFunc loader;
-    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.  
+    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.
     private LinkedList<Tuple> index;
     private FuncSpec rightLoaderFuncSpec;
 
@@ -79,9 +79,9 @@ public class DefaultIndexableLoader exte
     private transient TupleFactory mTupleFactory;
 
     private String inpLocation;
-    
+
     public DefaultIndexableLoader(
-            String loaderFuncSpec, 
+            String loaderFuncSpec,
             String indexFile,
             String indexFileLoadFuncSpec,
             String scope,
@@ -93,39 +93,39 @@ public class DefaultIndexableLoader exte
         this.scope = scope;
         this.inpLocation = inputLocation;
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
         // some setup
         mTupleFactory = TupleFactory.getInstance();
 
-        /* Currently whole of index is read into memory. Typically, index is small. Usually 
+        /* Currently whole of index is read into memory. Typically, index is small. Usually
            few KBs in size. So, this should not be an issue.
            However, reading whole index at startup time is not required. So, this can be improved upon.
            Assumption: Index being read is sorted on keys followed by filename, followed by offset.
          */
 
         // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next.
-        
+
         // the keys are sent in a tuple. If there is really only
         // 1 join key, it would be the first field of the tuple. If
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
-                
+
         Properties props = ConfigurationUtil.getLocalFSProperties();
         PigContext pc = new PigContext(ExecType.LOCAL, props);
         ld.setPc(pc);
         index = new LinkedList<Tuple>();
         for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);   
+            index.offer((Tuple) res.result);
+
 
-        
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
-     
+
         // When the first call is made, we need to seek into right input at correct offset.
         while(true){
             // Keep looping till we find first entry in index >= left key
@@ -148,15 +148,15 @@ public class DefaultIndexableLoader exte
                 prevIdxEntry = curIdxEntry;
                 continue;
             }
-            
+
             if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
                 index.addFirst(curIdxEntry);  // We need to add back the current index Entry because we are reading ahead.
                 if(null == prevIdxEntry)   // very first entry in index.
                     matchedEntry = curIdxEntry;
                 else{
-                    matchedEntry = prevIdxEntry; 
+                    matchedEntry = prevIdxEntry;
                     // start join from previous idx entry, it might have tuples
-                    // with this key                    
+                    // with this key
                     index.addFirst(prevIdxEntry);
                 }
                 break;
@@ -168,43 +168,43 @@ public class DefaultIndexableLoader exte
         if (matchedEntry == null) {
             LOG.warn("Empty index file: input directory is empty");
         } else {
-        
+
             Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
-            
+
             if (extractedKey != null) {
                 Class idxKeyClass = extractedKey.getClass();
                 if( ! firstLeftKey.getClass().equals(idxKeyClass)){
-    
+
                     // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also.
                     int errCode = 2166;
                     String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
                     throw new ExecException(errMsg,errCode,PigException.BUG);
                 }
-            } 
+            }
         }
-        
+
         //add remaining split indexes to splitsAhead array
         int [] splitsAhead = new int[index.size()];
         int splitsAheadIdx = 0;
         for(Tuple t : index){
             splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
         }
-        
+
         initRightLoader(splitsAhead);
     }
-    
+
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
-        PigContext pc = (PigContext) ObjectSerializer
-                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
-        
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+        Properties properties = (Properties) ObjectSerializer
+                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));
+
+        Configuration conf = ConfigurationUtil.toConfiguration(properties);
+
         // Hadoop security need this property to be set
         if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, 
+            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY,
                     System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
         }
-        
+
         //create ReadToEndLoader that will read the given splits in order
         loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
                 conf, inpLocation, splitsToBeRead);
@@ -216,7 +216,7 @@ public class DefaultIndexableLoader exte
 
         if(idxTupSize == 3)
             return idxTuple.get(0);
-        
+
         int numColsInKey = (idxTupSize - 2);
         List<Object> list = new ArrayList<Object>(numColsInKey);
         for(int i=0; i < numColsInKey; i++)
@@ -228,13 +228,13 @@ public class DefaultIndexableLoader exte
     private OperatorKey genKey(){
         return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
-    
+
     @Override
     public Tuple getNext() throws IOException {
         Tuple t = loader.getNext();
         return t;
     }
-    
+
     @Override
     public void close() throws IOException {
     }
@@ -242,14 +242,14 @@ public class DefaultIndexableLoader exte
     @Override
     public void initialize(Configuration conf) throws IOException {
         // nothing to do
-        
+
     }
 
     @Override
     public InputFormat getInputFormat() throws IOException {
         throw new UnsupportedOperationException();
     }
-    
+
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         throw new UnsupportedOperationException();
@@ -264,7 +264,7 @@ public class DefaultIndexableLoader exte
     public void setLocation(String location, Job job) throws IOException {
         // nothing to do
     }
-    
+
     public void setIndexFile(String indexFile) {
         this.indexFile = indexFile;
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Fri Feb 24 08:19:42 2017
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -42,7 +43,7 @@ public class GFCross extends EvalFunc<Da
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private int parallelism = 0;
-    private Random r = new Random();
+    private Random r;
     private String crossKey;
 
     static private final int DEFAULT_PARALLELISM = 96;
@@ -69,6 +70,14 @@ public class GFCross extends EvalFunc<Da
                 if (parallelism < 0) {
                     throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
                 }
+                long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
+                long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+                r = new Random(seed);
+            } else {
+                // Don't see a case where cfg can be null.
+                // But there is an existing testcase TestGFCross.testDefault
+                // Using constant generated from task_14738102975522_0001_r_000000 hashcode
+                r = new Random(-4235927512599300514L);
             }
 
             numInputs = (Integer)input.get(0);

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri Feb 24 08:19:42 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
             // number of tuples to be skipped
             Tuple t = loader.getNext();
             if(t == null) {
-                return createNumRowTuple(null);
+                // since skipInterval is -1, no previous sample,
+                // and next sample is null -> the data set is empty
+                return null;
             }
             long availRedMem = (long) ( totalMemory * heapPerc);
             // availRedMem = 155084396;

Modified: pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java Fri Feb 24 08:19:42 2017
@@ -57,6 +57,8 @@ public class NullableTuple extends PigNu
     public void readFields(DataInput in) throws IOException {
         boolean nullness = in.readBoolean();
         setNull(nullness);
+        // Free up the previous value for GC
+        mValue = null;
         if (!nullness) {
             mValue = bis.readTuple(in);
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java Fri Feb 24 08:19:42 2017
@@ -102,7 +102,7 @@ public class PigFile {
         if(oc.needsTaskCommit(tac)) {
             oc.commitTask(tac);
         }
-        HadoopShims.commitOrCleanup(oc, jc);
+        oc.commitJob(jc);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Feb 24 08:19:42 2017
@@ -40,17 +40,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
  * This is wrapper Loader which wraps a real LoadFunc underneath and allows
- * to read a file completely starting a given split (indicated by a split index 
+ * to read a file completely starting a given split (indicated by a split index
  * which is used to look in the List<InputSplit> returned by the underlying
  * InputFormat's getSplits() method). So if the supplied split index is 0, this
  * loader will read the entire file. If it is non zero it will read the partial
  * file beginning from that split to the last split.
- * 
+ *
  * The call sequence to use this is:
  * 1) construct an object using the constructor
  * 2) Call getNext() in a loop till it returns null
@@ -61,52 +60,50 @@ public class ReadToEndLoader extends Loa
      * the wrapped LoadFunc which will do the actual reading
      */
     private LoadFunc wrappedLoadFunc;
-    
+
     /**
      * the Configuration object used to locate the input location - this will
      * be used to call {@link LoadFunc#setLocation(String, Job)} on
      * the wrappedLoadFunc
      */
     private Configuration conf;
-    
+
     /**
      * the input location string (typically input file/dir name )
      */
     private String inputLocation;
-      
+
     /**
      * If the splits to be read are not in increasing sequence of integers
      * this array can be used
      */
     private int[] toReadSplits = null;
-    
+
     /**
      * index into toReadSplits
      */
     private int toReadSplitsIdx = 0;
-    
+
     /**
      * the index of the split the loader is currently reading from
      */
     private int curSplitIndex;
-    
+
     /**
      * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)}
      */
     private List<InputSplit> inpSplits = null;
-    
+
     /**
      * underlying RecordReader
      */
     private RecordReader reader = null;
-    
+
     /**
      * underlying InputFormat
      */
     private InputFormat inputFormat = null;
-    
-    private PigContext pigContext;
-    
+
     private String udfContextSignature = null;
 
     /**
@@ -114,8 +111,8 @@ public class ReadToEndLoader extends Loa
      * @param conf
      * @param inputLocation
      * @param splitIndex
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex) throws IOException {
@@ -125,17 +122,7 @@ public class ReadToEndLoader extends Loa
         this.curSplitIndex = splitIndex;
         init();
     }
-    
-    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
-            String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
-        this.wrappedLoadFunc = wrappedLoadFunc;
-        this.inputLocation = inputLocation;
-        this.conf = conf;
-        this.curSplitIndex = splitIndex;
-        this.pigContext = pigContext;
-        init();
-    }
-    
+
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex, String signature) throws IOException {
         this.udfContextSignature = signature;
@@ -147,14 +134,14 @@ public class ReadToEndLoader extends Loa
     }
 
     /**
-     * This constructor takes an array of split indexes (toReadSplitIdxs) of the 
+     * This constructor takes an array of split indexes (toReadSplitIdxs) of the
      * splits to be read.
      * @param wrappedLoadFunc
      * @param conf
      * @param inputLocation
      * @param toReadSplitIdxs
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int[] toReadSplitIdxs) throws IOException {
@@ -166,21 +153,21 @@ public class ReadToEndLoader extends Loa
             toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE;
         init();
     }
-    
+
     @SuppressWarnings("unchecked")
     private void init() throws IOException {
-        if (conf != null && pigContext != null) {
-            SchemaTupleBackend.initialize(conf, pigContext, true);
+        if (conf != null) {
+            SchemaTupleBackend.initialize(conf, true);
         }
 
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
         conf = new Configuration(conf);
 
-        // let's initialize the wrappedLoadFunc 
+        // let's initialize the wrappedLoadFunc
         Job job = new Job(conf);
         wrappedLoadFunc.setUDFContextSignature(this.udfContextSignature);
-        wrappedLoadFunc.setLocation(inputLocation, 
+        wrappedLoadFunc.setLocation(inputLocation,
                 job);
         // The above setLocation call could write to the conf within
         // the job - get a hold of the modified conf
@@ -191,10 +178,10 @@ public class ReadToEndLoader extends Loa
                     new JobID()));
         } catch (InterruptedException e) {
             throw new IOException(e);
-        }        
+        }
     }
 
-    private boolean initializeReader() throws IOException, 
+    private boolean initializeReader() throws IOException,
     InterruptedException {
         // Close the previous reader first
         if(reader != null) {
@@ -206,14 +193,14 @@ public class ReadToEndLoader extends Loa
             return false;
         }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
-        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, 
+        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf,
                 new TaskAttemptID());
         reader = inputFormat.createRecordReader(curSplit, tAContext);
         reader.initialize(curSplit, tAContext);
         // create a dummy pigsplit - other than the actual split, the other
         // params are really not needed here where we are just reading the
         // input completely
-        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
+        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
                 new ArrayList<OperatorKey>(), -1);
         // Set the conf object so that if the wrappedLoadFunc uses it,
         // it won't be null
@@ -244,7 +231,7 @@ public class ReadToEndLoader extends Loa
             throw new IOException(e);
         }
     }
-    
+
     private Tuple getNextHelper() throws IOException, InterruptedException {
         Tuple t = null;
         while(initializeReader()) {
@@ -258,8 +245,8 @@ public class ReadToEndLoader extends Loa
         }
         return null;
     }
-    
-    
+
+
     /**
      * Updates curSplitIndex , just increment if splitIndexes is null,
      * else get next split in splitIndexes
@@ -331,7 +318,7 @@ public class ReadToEndLoader extends Loa
              ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
         }
     }
-    
+
     @Override
     public void setUDFContextSignature(String signature) {
         this.udfContextSignature = signature;

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java Fri Feb 24 08:19:42 2017
@@ -20,43 +20,78 @@ package org.apache.pig.impl.plan;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Generates IDs as long values in a thread safe manner. Each thread has its own generated IDs.
+ */
 public class NodeIdGenerator {
 
-    private Map<String, Long> scopeToIdMap;
-    private static NodeIdGenerator theGenerator = new NodeIdGenerator();
-
-    private NodeIdGenerator() {
-        scopeToIdMap = new HashMap<String, Long>();
-    }
-
+	/**
+	 * Holds a map of generated scoped-IDs per thread. Each map holds generated IDs per scope.
+	 */
+    private ThreadLocal<Map<String, AtomicLong>> scopeToIdMap
+        = new ThreadLocal<Map<String, AtomicLong>>() {
+            protected Map<String, AtomicLong> initialValue() {
+                return new HashMap<String,AtomicLong>();
+            }
+        };
+
+    /**
+     * Singleton instance.
+     */
+    private static final NodeIdGenerator theGenerator = new NodeIdGenerator();
+
+    /**
+     * Private default constructor to force singleton use-case of this class.
+     */
+    private NodeIdGenerator() {}
+
+    /**
+     * Returns the NodeIdGenerator singleton.
+     * @return
+     */
     public static NodeIdGenerator getGenerator() {
         return theGenerator;
     }
 
-    public long getNextNodeId(String scope) {
-        Long val = scopeToIdMap.get(scope);
-
-        long nextId = 0;
-
-        if (val != null) {
-            nextId = val.longValue();
-        }
-
-        scopeToIdMap.put(scope, nextId + 1);
-
-        return nextId;
+    /**
+     * Returns the next ID to be used for the current Thread.
+     * 
+     * @param scope
+     * @return
+     */
+    public long getNextNodeId(final String scope) {
+        // ThreadLocal usage protects us from having the same HashMap instance
+        // being used by several threads, so we can use it without synchronized
+        // blocks and still be thread-safe.
+        Map<String, AtomicLong> map = scopeToIdMap.get();
+
+        // the concurrent properties of the AtomicLong are useless here but
+        // since it cost less to use such an object rather than created a
+        // Long object instance each time we increment a counter ...
+        AtomicLong l = map.get(scope);
+        if ( l == null )
+            map.put( scope, l = new AtomicLong() );
+        return l.getAndIncrement();
     }
 
+    /**
+     * Reset the given scope IDs to 0 for the current Thread.
+     * @param scope
+     */
     @VisibleForTesting
-    public static void reset(String scope) {
-        theGenerator.scopeToIdMap.put(scope, 0L) ;
+    public static void reset(final String scope) {
+        theGenerator.scopeToIdMap.get().remove(scope);
     }
 
+    /**
+     * Reset all scope IDs to 0 for the current Thread.
+     */
     @VisibleForTesting
     public static void reset() {
-        theGenerator.scopeToIdMap.clear();
+        theGenerator.scopeToIdMap.remove();
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java Fri Feb 24 08:19:42 2017
@@ -150,12 +150,13 @@ public class ExecutableManager {
 
         LOG.debug("Process exited with: " + exitCode);
         if (exitCode != SUCCESS) {
-            LOG.error(command + " failed with exit status: "
-                    + exitCode);
+            String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode;
+            LOG.error(errMsg);
+            Result res = new Result(POStatus.STATUS_ERR, errMsg);
+            sendOutput(poStream.getBinaryOutputQueue(), res);
         }
 
-        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
-
+        if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
             // Trigger the outputHandler
             outputHandler.bindTo("", null, 0, -1);
 
@@ -178,10 +179,18 @@ public class ExecutableManager {
      * @param process the process to be killed
      * @throws IOException
      */
-    private void killProcess(Process process) throws IOException {
+    private void killProcess(Process process) {
         if (process != null) {
-            inputHandler.close(process);
-            outputHandler.close();
+            try {
+                inputHandler.close(process);
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage());
+            }
+            try {
+                outputHandler.close();
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage());
+            }
             process.destroy();
         }
     }
@@ -334,7 +343,7 @@ public class ExecutableManager {
                                 // we will only call close() here and not
                                 // worry about deducing whether the process died
                                 // normally or abnormally - if there was any real
-                                // issue the ProcessOutputThread should see
+                                // issue we should see
                                 // a non zero exit code from the process and send
                                 // a POStatus.STATUS_ERR back - what if we got
                                 // an IOException because there was only an issue with
@@ -344,14 +353,6 @@ public class ExecutableManager {
                                 return;
                             } else {
                                 // asynchronous case - then this is a real exception
-                                LOG.error("Exception while trying to write to stream binary's input", e);
-                                // send POStatus.STATUS_ERR to POStream to signal the error
-                                // Generally the ProcessOutputThread would do this but now
-                                // we should do it here since neither the process nor the
-                                // ProcessOutputThread will ever be spawned
-                                Result res = new Result(POStatus.STATUS_ERR,
-                                        "Exception while trying to write to stream binary's input" + e.getMessage());
-                                sendOutput(poStream.getBinaryOutputQueue(), res);
                                 throw e;
                             }
                         }
@@ -362,13 +363,13 @@ public class ExecutableManager {
             } catch (Throwable t) {
                 // Note that an error occurred
                 outerrThreadsError = t;
-                LOG.error( "Error while reading from POStream and " +
-                           "passing it to the streaming process", t);
-                try {
-                    killProcess(process);
-                } catch (IOException ioe) {
-                    LOG.warn(ioe);
-                }
+                Result res = new Result(POStatus.STATUS_ERR,
+                                        "Error while reading from POStream and " +
+                                        "passing it to the streaming process:" + t.getMessage());
+                LOG.error("Error while reading from POStream and " +
+                          "passing it to the streaming process:", t);
+                sendOutput(poStream.getBinaryOutputQueue(), res);
+                killProcess(process);
             }
         }
     }
@@ -452,13 +453,7 @@ public class ExecutableManager {
                 try {
                     exitCode = process.waitFor();
                 } catch (InterruptedException ie) {
-                    try {
-                        killProcess(process);
-                    } catch (IOException e) {
-                        LOG.warn("Exception trying to kill process while processing null output " +
-                                "from binary", e);
-
-                    }
+                    killProcess(process);
                     // signal error
                     String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
                             ie.getMessage();

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Fri Feb 24 08:19:42 2017
@@ -175,8 +175,10 @@ public abstract class OutputHandler {
      */
     public synchronized void close() throws IOException {
         if(!alreadyClosed) {
-            istream.close();
-            istream = null;
+            if( istream != null ) {
+                istream.close();
+                istream = null;
+            }
             alreadyClosed = true;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Fri Feb 24 08:19:42 2017
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
@@ -66,7 +65,6 @@ public class JarManager {
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
-        GUAVA(Multimaps.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;
@@ -208,11 +206,8 @@ public class JarManager {
     public static List<String> getDefaultJars() {
         List<String> defaultJars = new ArrayList<String>();
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
-            if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
-                continue; //Skip
-            }
             String jar = findContainingJar(pkgToSend.getPkgClass());
-            if (!defaultJars.contains(jar)) {
+            if (jar != null && !defaultJars.contains(jar)) {
                 defaultJars.add(jar);
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Feb 24 08:19:42 2017
@@ -51,10 +51,9 @@ public class SpillableMemoryManager impl
 
     private static final Log log = LogFactory.getLog(SpillableMemoryManager.class);
 
-    private static final int ONE_GB = 1024 * 1024 * 1024;
     private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 1024;
-    private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
-    private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+    private static final float MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7f;
+    private static final float COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7f;
 
     private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
     // References to spillables with size
@@ -86,7 +85,7 @@ public class SpillableMemoryManager impl
 
     // fraction of the total heap used for the threshold to determine
     // if we want to perform an extra gc before the spill
-    private double extraGCThresholdFraction = 0.05;
+    private float extraGCThresholdFraction = 0.05f;
     private long extraGCSpillSizeThreshold  = 0L;
 
     private volatile boolean blockRegisterOnSpill = false;
@@ -142,7 +141,7 @@ public class SpillableMemoryManager impl
      * @param unusedMemoryThreshold
      *            Unused memory size below which we want to get notifications
      */
-    private void configureMemoryThresholds(double memoryThresholdFraction, double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+    private void configureMemoryThresholds(float memoryThresholdFraction, float collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
         long tenuredHeapSize = tenuredHeap.getUsage().getMax();
         memoryThresholdSize = (long)(tenuredHeapSize * memoryThresholdFraction);
         collectionThresholdSize = (long)(tenuredHeapSize * collectionMemoryThresholdFraction);
@@ -184,8 +183,8 @@ public class SpillableMemoryManager impl
 
         spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", spillFileSizeThreshold);
         gcActivationSize = conf.getLong("pig.spill.gc.activation.size", gcActivationSize);
-        double memoryThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
-        double collectionThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+        float memoryThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
+        float collectionThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
         long unusedMemoryThreshold = conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, UNUSED_MEMORY_THRESHOLD_DEFAULT);
         configureMemoryThresholds(memoryThresholdFraction, collectionThresholdFraction, unusedMemoryThreshold);
     }
@@ -199,7 +198,7 @@ public class SpillableMemoryManager impl
         // used - heapmax/2 + heapmax/4
         long toFree = 0L;
         if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
-            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call- Usage threshold "
@@ -211,7 +210,7 @@ public class SpillableMemoryManager impl
                 log.debug(msg);
             }
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
-            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call - Collection threshold "

Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Fri Feb 24 08:19:42 2017
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -93,18 +94,10 @@ public class Utils {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
 
-    public static boolean isHadoop23() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b0\\.23\\..+\\b"))
-            return true;
-        return false;
-    }
-
-    public static boolean isHadoop2() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b2\\.\\d+\\..+"))
-            return true;
-        return false;
+    public static boolean is64bitJVM() {
+        String arch = System.getProperties().getProperty("sun.arch.data.model",
+                System.getProperty("com.ibm.vm.bitmode"));
+        return arch != null && arch.equals("64");
     }
 
     /**
@@ -574,6 +567,11 @@ public class Utils {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }
 
+    public static boolean isLocal(Configuration conf) {
+        return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)
+                || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+    }
+
     // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
     // Following code has been borrowed from Hadoop's Configuration#substituteVars
     private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
@@ -697,4 +695,15 @@ public class Utils {
             DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
         }
     }
+
+    /**
+     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+     *
+     * @param hook code to execute during shutdown
+     * @param priority Priority over the  FileSystem.SHUTDOWN_HOOK_PRIORITY
+     */
+    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+        ShutdownHookManager.get().addShutdownHook(hook,
+                FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Fri Feb 24 08:19:42 2017
@@ -118,6 +118,8 @@ public class AvroStorageDataConversionUt
         return ByteBuffer.wrap(((DataByteArray) o).get());
       case FIXED:
         return new GenericData.Fixed(s, ((DataByteArray) o).get());
+      case ENUM:
+        return new GenericData.EnumSymbol(s,o.toString());
       default:
         if (DataType.findType(o) == DataType.DATETIME) {
           return ((DateTime) o).getMillis();

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Fri Feb 24 08:19:42 2017
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -49,6 +50,7 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@ public final class AvroTupleWrapper <T e
   }
 
   @Override
-  public void write(final DataOutput o) throws IOException {
-    throw new IOException(
-        this.getClass().toString() + ".write called, but not implemented yet");
+  public void write(DataOutput out) throws IOException {
+      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+      t.write(out);
   }
 
   @SuppressWarnings("rawtypes")

Modified: pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java Fri Feb 24 08:19:42 2017
@@ -98,13 +98,17 @@ public abstract class FilterExtractor {
     public void visit() throws FrontendException {
         // we will visit the leaf and it will recursively walk the plan
         LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
-        // if the leaf is a unary operator it should be a FilterFunc in
-        // which case we don't try to extract partition filter conditions
-        if(leaf instanceof BinaryExpression) {
-            // recursively traverse the tree bottom up
-            // checkPushdown returns KeyState which is pair of LogicalExpression
-            BinaryExpression binExpr = (BinaryExpression)leaf;
-            KeyState finale = checkPushDown(binExpr);
+
+        // recursively traverse the tree bottom up
+        // checkPushdown returns KeyState which is pair of LogicalExpression
+        KeyState finale = null;
+        if (leaf instanceof BinaryExpression) {
+            finale = checkPushDown((BinaryExpression) leaf);
+        } else if (leaf instanceof UnaryExpression) {
+            finale = checkPushDown((UnaryExpression) leaf);
+        }
+
+        if (finale != null) {
             this.filterExpr = finale.filterExpr;
             this.pushdownExpr = getExpression(finale.pushdownExpr);
         }
@@ -278,12 +282,22 @@ public abstract class FilterExtractor {
             if (unaryExpr instanceof CastExpression) {
                 return checkPushDown(unaryExpr.getExpression());
             }
-            if (unaryExpr instanceof IsNullExpression) {
-                state.pushdownExpr = unaryExpr;
-                state.filterExpr = null;
-            } else if (unaryExpr instanceof NotExpression) {
-                state.pushdownExpr = unaryExpr;
-                state.filterExpr = null;
+            // For IsNull, the child may not be a supported expression, e.g. MapLookupExpression.
+            // For NotExpression, the child, C, is broken into expressions P and F such that C = P AND F
+            // Consequently, NOT C = NOT P OR NOT F, which can't be expressed as an AND so both must be
+            // pushed or both used as a filter.
+            // For both cases, this expr can be pushed if and only if the entire child can be.
+            if (unaryExpr instanceof IsNullExpression || unaryExpr instanceof NotExpression) {
+                KeyState childState = checkPushDown(unaryExpr.getExpression());
+                if (childState.filterExpr == null) {
+                    // only push down if the entire expression can be pushed
+                    state.pushdownExpr = unaryExpr;
+                    state.filterExpr = null;
+                } else {
+                    removeFromFilteredPlan(childState.filterExpr);
+                    state.filterExpr = addToFilterPlan(unaryExpr);
+                    state.pushdownExpr = null;
+                }
             } else {
                 state.filterExpr = addToFilterPlan(unaryExpr);
                 state.pushdownExpr = null;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Fri Feb 24 08:19:42 2017
@@ -323,6 +323,7 @@ public class ExpToPhyTranslationVisitor
     public void visit( CastExpression op ) throws FrontendException {
         POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
+        pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
 //        physOp.setAlias(op.getAlias());
         currentPlan.add(pCast);
 

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java Fri Feb 24 08:19:42 2017
@@ -95,7 +95,8 @@ public class MapLookupExpression extends
         LogicalFieldSchema predFS = successor.getFieldSchema();
         if (predFS!=null) {
             if (predFS.type==DataType.MAP && predFS.schema!=null) {
-                return (predFS.schema.getField(0));
+                fieldSchema = predFS.schema.getField(0);
+                return fieldSchema;
             }
             else {
                 fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Fri Feb 24 08:19:42 2017
@@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR
      // to store uid in mUserDefinedSchema
      private List<LogicalSchema> mUserDefinedSchema = null;
      private List<LogicalSchema> outputPlanSchemas = null;
+     private List<LogicalSchema> expSchemas = null;
      // If LOGenerate generate new uid, cache it here.
      // This happens when expression plan does not have complete schema, however,
      // user give complete schema in ForEach statement in script
@@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR
         
         schema = new LogicalSchema();
         outputPlanSchemas = new ArrayList<LogicalSchema>();
+        expSchemas = new ArrayList<LogicalSchema>();
         
         for(int i=0; i<outputPlans.size(); i++) {
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -93,19 +95,17 @@ public class LOGenerate extends LogicalR
                 fieldSchema = exp.getFieldSchema().deepCopy();
                 
                 expSchema = new LogicalSchema();
-                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
+                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
                     // if type is primitive, just add to schema
-                    if (fieldSchema!=null)
+                    if (fieldSchema != null)
                         expSchema.addField(fieldSchema);
-                    else
-                        expSchema = null;
                 } else {
-                    // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+                    // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
                     if (fieldSchema.schema==null) {
                         expSchema = null;
                     }
                     else {
-                     // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
+                     // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
                         List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
                         if (flattenFlags[i]) {
                             if (fieldSchema.type == DataType.BAG) {
@@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR
                                         fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                     }
                                 }
+                            } else if (fieldSchema.type == DataType.MAP) {
+                                //should only contain 1 schemafield for Map's value
+                                innerFieldSchemas = fieldSchema.schema.getFields();
+                                LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
+                                fsForValue.alias = fieldSchema.alias + "::value";
+
+                                LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
+                                        fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
+
+                                expSchema.addField(fsForKey);
                             } else { // DataType.TUPLE
                                 innerFieldSchemas = fieldSchema.schema.getFields();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                 }
                             }
-                            
+
                             for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
                                 expSchema.addField(fs);
                         }
@@ -137,6 +147,7 @@ public class LOGenerate extends LogicalR
             if (expSchema!=null && expSchema.size()==0)
                 expSchema = null;
             LogicalSchema planSchema = new LogicalSchema();
+            expSchemas.add(expSchema);
             if (mUserDefinedSchemaCopy!=null) {
                 LogicalSchema mergedSchema = new LogicalSchema();
                 // merge with userDefinedSchema
@@ -146,12 +157,6 @@ public class LOGenerate extends LogicalR
                         fs.stampFieldSchema();
                         mergedSchema.addField(new LogicalFieldSchema(fs));
                     }
-                    for (LogicalFieldSchema fs : mergedSchema.getFields()) {
-                        if (fs.type == DataType.NULL){
-                            //this is the use case where a new alias has been specified by user
-                            fs.type = DataType.BYTEARRAY;
-                        }
-                    }
                 } else {
 
                     // Merge uid with the exp field schema
@@ -163,8 +168,12 @@ public class LOGenerate extends LogicalR
                     mergedSchema.mergeUid(expSchema);
 
                 }
-                for (LogicalFieldSchema fs : mergedSchema.getFields())
+                for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+                    if (fs.type==DataType.NULL) {
+                        fs.type = DataType.BYTEARRAY;
+                    }
                     planSchema.addField(fs);
+                }
             } else {
                 // if any plan do not have schema, the whole LOGenerate do not have schema
                 if (expSchema==null) {
@@ -310,4 +319,8 @@ public class LOGenerate extends LogicalR
         super.resetSchema();
         outputPlanSchemas = null;
     }
+
+    public List<LogicalSchema> getExpSchemas() {
+        return expSchemas;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java Fri Feb 24 08:19:42 2017
@@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat
      */
     public static enum JOINTYPE {
         HASH,    // Hash Join
+        BLOOM,   // Bloom Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE,   // Sort Merge Join

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Feb 24 08:19:42 2017
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor
 
             return;
         }
-        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
             POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
             POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
             currentPlan.add(fe);
@@ -1425,7 +1425,20 @@ public class LogToPhyTranslationVisitor
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
-            poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
+                if (innerFlags.length == 2) {
+                    if (innerFlags[0] == false && innerFlags[1] == false) {
+                        throw new LogicalToPhysicalTranslatorException(
+                                "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
+                                        ". Bloom join cannot be used with a FULL OUTER join.",
+                                1109,
+                                PigException.INPUT);
+                    }
+                }
+                poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
+            } else {
+                poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            }
         }
         translateSoftLinks(loj);
     }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Feb 24 08:19:42 2017
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
 import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
 import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
+import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
 import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
 import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe
         new ColumnAliasConversionVisitor(this).visit();
         new SchemaAliasVisitor(this).visit();
         new ScalarVisitor(this, pigContext, scope).visit();
+        new ForEachUserSchemaVisitor(this).visit();
 
         // ImplicitSplitInsertVisitor has to be called before
         // DuplicateForEachColumnRewriteVisitor.  Detail at pig-1766
@@ -189,6 +191,15 @@ public class LogicalPlan extends BaseOpe
 
         new TypeCheckingRelVisitor( this, collector).visit();
 
+
+        new UnionOnSchemaSetter(this).visit();
+        new CastLineageSetter(this, collector).visit();
+        new ScalarVariableValidator(this).visit();
+        new StoreAliasSetter(this).visit();
+
+        // compute whether output data is sorted or not
+        new SortInfoSetter(this).visit();
+
         boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
 
         if(aggregateWarning) {
@@ -199,14 +210,6 @@ public class LogicalPlan extends BaseOpe
             }
         }
 
-        new UnionOnSchemaSetter(this).visit();
-        new CastLineageSetter(this, collector).visit();
-        new ScalarVariableValidator(this).visit();
-        new StoreAliasSetter(this).visit();
-
-        // compute whether output data is sorted or not
-        new SortInfoSetter(this).visit();
-
         if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) {
             // Validate input/output file
             new InputOutputFileValidatorVisitor(this, pigContext).visit();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Feb 24 08:19:42 2017
@@ -150,6 +150,39 @@ public class LogicalSchema {
             }
             return true;
         }
+
+        // Check if fs1 is equal to fs2 with regard to type
+        public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
+            if (fs1==null && fs2==null) {
+                return true;
+            }
+            if (fs1==null || fs2==null) {
+                return false;
+            }
+            if (fs1.type!=fs2.type) {
+                return false;
+            }
+            if (DataType.isComplex(fs1.type)) {
+                LogicalSchema s1 = fs1.schema;
+                LogicalSchema s2 = fs2.schema;
+                if (s1==null && s2==null) {
+                    return true;
+                }
+                if (fs1==null || fs2==null) {
+                    return false;
+                }
+                if (s1.size()!=s2.size()) {
+                    return false;
+                }
+                for (int i=0;i<s1.size();i++) {
+                    if (!typeMatch(s1.getField(i), s2.getField(i))) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
         /**
          * Adds the uid from FieldSchema argument to this FieldSchema
          * If the argument is null, it stamps this FieldSchema with uid
@@ -447,7 +480,23 @@ public class LogicalSchema {
             LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
             return mergedFS;
         }
-        
+
+        public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException {
+            if (fs1.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs2.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs1.type == fs2.type) {
+                if (DataType.isComplex(fs1.type)) {
+                    return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema);
+                } else {
+                    return true;
+                }
+            } else {
+                return false;
+            }
+        }
+
         /***
          * Old Pig field schema does not require a tuple schema inside a bag;
          * Now it is required to have that; this method is to fill the gap
@@ -770,7 +819,24 @@ public class LogicalSchema {
         }
         return mergedSchema;
     }
-    
+
+    public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+        if (s1 == null) {
+            return true;
+        } else if (s2 == null) {
+            return true;
+        } else if (s1.size() != s2.size()) {
+            return false;
+        } else {
+            for (int i=0;i<s1.size();i++) {
+                if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
     public String toString(boolean verbose) {
         StringBuilder str = new StringBuilder();
         

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java Fri Feb 24 08:19:42 2017
@@ -95,7 +95,7 @@ public class AddForEach extends WholePla
             }
             
             Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
-            if (outputUids==null)
+            if (outputUids==null || outputUids.size() == 0 )
                 return false;
             
             LogicalSchema schema = op.getSchema();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Fri Feb 24 08:19:42 2017
@@ -107,7 +107,7 @@ public class CastLineageSetter extends A
                 if(inLoadFunc == null){
                     String msg = "Cannot resolve load function to use for casting from " + 
                                 DataType.findTypeName(inType) + " to " +
-                                DataType.findTypeName(outType) + ". ";
+                                DataType.findTypeName(outType) + " at " + cast.getLocation() ;
                     msgCollector.collect(msg, MessageType.Warning,
                            PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
                 }else {