You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 03:34:40 UTC

svn commit: r1784224 [8/17] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java Fri Feb 24 03:34:37 2017
@@ -29,7 +29,7 @@ import org.apache.pig.classification.Int
 
 /**
  * Common functionality for proactively spilling bags that need to keep the data
- * sorted.
+ * sorted. 
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext
         //count for number of objects that have spilled
         if(mSpillFiles == null)
             incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
-
+        
         long spilled = 0;
-
+        
         DataOutputStream out = null;
         try {
             out = getSpillFile();
@@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext
             //sort the tuples
             // as per documentation of collection.sort(), it copies to an array,
             // sorts and copies back to collection
-            // Avoiding that extra copy back to collection (mContents) by
+            // Avoiding that extra copy back to collection (mContents) by 
             // copying to an array and using Arrays.sort
             Tuple[] array = new Tuple[mContents.size()];
             mContents.toArray(array);
             if(comp == null)
                 Arrays.sort(array);
-            else
+            else 
                 Arrays.sort(array,comp);
 
             //dump the array
@@ -89,15 +89,12 @@ public abstract class SortedSpillBag ext
             }
 
             out.flush();
-            out.close();
-            out = null;
-            mContents.clear();
-        } catch (Throwable e) {
+        } catch (IOException ioe) {
             // Remove the last file from the spilled array, since we failed to
             // write to it.
             mSpillFiles.remove(mSpillFiles.size() - 1);
             warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
             return 0;
         } finally {
             if (out != null) {
@@ -108,9 +105,11 @@ public abstract class SortedSpillBag ext
                 }
             }
         }
+        mContents.clear();
+        
         incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
-
+        
         return spilled;
     }
-
+    
 }

Modified: pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java Fri Feb 24 03:34:37 2017
@@ -28,7 +28,7 @@ public class UnlimitedNullTuple extends
 
     @Override
     public int size() {
-        return Integer.MAX_VALUE;
+        throw new RuntimeException("Unimplemented");
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java Fri Feb 24 03:34:37 2017
@@ -61,25 +61,25 @@ public class SedesHelper {
     public static void writeChararray(DataOutput out, String s) throws IOException {
         // a char can take up to 3 bytes in the modified utf8 encoding
         // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
-        byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
-        int length = utfBytes.length;
-        if (length < BinInterSedes.UNSIGNED_SHORT_MAX) {
+        if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) {
             out.writeByte(BinInterSedes.SMALLCHARARRAY);
-            out.writeShort(length);
+            out.writeUTF(s);
         } else {
+            byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
+            int length = utfBytes.length;
+
             out.writeByte(BinInterSedes.CHARARRAY);
             out.writeInt(length);
+            out.write(utfBytes);
         }
-        out.write(utfBytes);
     }
 
     public static String readChararray(DataInput in, byte type) throws IOException {
-        int size;
         if (type == BinInterSedes.SMALLCHARARRAY) {
-            size = in.readUnsignedShort();
-        } else {
-            size = in.readInt();
+            return in.readUTF();
         }
+
+        int size = in.readInt();
         byte[] buf = new byte[size];
         in.readFully(buf);
         return new String(buf, BinInterSedes.UTF8);

Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Fri Feb 24 03:34:37 2017
@@ -38,12 +38,6 @@ public class PigImplConstants {
     public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
 
     /**
-     * Used by pig to indicate that current job is running in local mode (local/tez_local)
-     * ie. ExecType.isLocal() is true
-     */
-    public static final String PIG_EXECTYPE_MODE_LOCAL = "pig.exectype.mode.local";
-
-    /**
      * Used by pig to indicate that current job has been converted to run in local mode
      */
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
@@ -69,24 +63,4 @@ public class PigImplConstants {
      * Parallelism to be used for CROSS operation by GFCross UDF
      */
     public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
-
-    /**
-     * Pig context
-     */
-    public static final String PIG_CONTEXT = "pig.pigContext";
-
-    /**
-     * Pig log4j properties
-     */
-    public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
-
-    /**
-     * A unique id for a Pig session used as callerId for underlining component
-     */
-    public static final String PIG_AUDIT_ID = "pig.script.id";
-
-    // Kill the jobs before cleaning up tmp files
-    public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
-    public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
-    public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Fri Feb 24 03:34:37 2017
@@ -64,13 +64,13 @@ import org.apache.pig.impl.util.ObjectSe
 public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
     private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class);
-
+    
     // FileSpec of index file which will be read from HDFS.
     private String indexFile;
     private String indexFileLoadFuncSpec;
-
+    
     private LoadFunc loader;
-    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.
+    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.  
     private LinkedList<Tuple> index;
     private FuncSpec rightLoaderFuncSpec;
 
@@ -79,9 +79,9 @@ public class DefaultIndexableLoader exte
     private transient TupleFactory mTupleFactory;
 
     private String inpLocation;
-
+    
     public DefaultIndexableLoader(
-            String loaderFuncSpec,
+            String loaderFuncSpec, 
             String indexFile,
             String indexFileLoadFuncSpec,
             String scope,
@@ -93,39 +93,39 @@ public class DefaultIndexableLoader exte
         this.scope = scope;
         this.inpLocation = inputLocation;
     }
-
+    
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
         // some setup
         mTupleFactory = TupleFactory.getInstance();
 
-        /* Currently whole of index is read into memory. Typically, index is small. Usually
+        /* Currently whole of index is read into memory. Typically, index is small. Usually 
            few KBs in size. So, this should not be an issue.
            However, reading whole index at startup time is not required. So, this can be improved upon.
            Assumption: Index being read is sorted on keys followed by filename, followed by offset.
          */
 
         // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next.
-
+        
         // the keys are sent in a tuple. If there is really only
         // 1 join key, it would be the first field of the tuple. If
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
-
+                
         Properties props = ConfigurationUtil.getLocalFSProperties();
         PigContext pc = new PigContext(ExecType.LOCAL, props);
         ld.setPc(pc);
         index = new LinkedList<Tuple>();
         for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);
-
+            index.offer((Tuple) res.result);   
 
+        
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
-
+     
         // When the first call is made, we need to seek into right input at correct offset.
         while(true){
             // Keep looping till we find first entry in index >= left key
@@ -148,15 +148,15 @@ public class DefaultIndexableLoader exte
                 prevIdxEntry = curIdxEntry;
                 continue;
             }
-
+            
             if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
                 index.addFirst(curIdxEntry);  // We need to add back the current index Entry because we are reading ahead.
                 if(null == prevIdxEntry)   // very first entry in index.
                     matchedEntry = curIdxEntry;
                 else{
-                    matchedEntry = prevIdxEntry;
+                    matchedEntry = prevIdxEntry; 
                     // start join from previous idx entry, it might have tuples
-                    // with this key
+                    // with this key                    
                     index.addFirst(prevIdxEntry);
                 }
                 break;
@@ -168,43 +168,43 @@ public class DefaultIndexableLoader exte
         if (matchedEntry == null) {
             LOG.warn("Empty index file: input directory is empty");
         } else {
-
+        
             Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
-
+            
             if (extractedKey != null) {
                 Class idxKeyClass = extractedKey.getClass();
                 if( ! firstLeftKey.getClass().equals(idxKeyClass)){
-
+    
                     // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also.
                     int errCode = 2166;
                     String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
                     throw new ExecException(errMsg,errCode,PigException.BUG);
                 }
-            }
+            } 
         }
-
+        
         //add remaining split indexes to splitsAhead array
         int [] splitsAhead = new int[index.size()];
         int splitsAheadIdx = 0;
         for(Tuple t : index){
             splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
         }
-
+        
         initRightLoader(splitsAhead);
     }
-
+    
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
-        Properties properties = (Properties) ObjectSerializer
-                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));
-
-        Configuration conf = ConfigurationUtil.toConfiguration(properties);
-
+        PigContext pc = (PigContext) ObjectSerializer
+                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
+        
+        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+        
         // Hadoop security need this property to be set
         if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY,
+            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, 
                     System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
         }
-
+        
         //create ReadToEndLoader that will read the given splits in order
         loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
                 conf, inpLocation, splitsToBeRead);
@@ -216,7 +216,7 @@ public class DefaultIndexableLoader exte
 
         if(idxTupSize == 3)
             return idxTuple.get(0);
-
+        
         int numColsInKey = (idxTupSize - 2);
         List<Object> list = new ArrayList<Object>(numColsInKey);
         for(int i=0; i < numColsInKey; i++)
@@ -228,13 +228,13 @@ public class DefaultIndexableLoader exte
     private OperatorKey genKey(){
         return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
-
+    
     @Override
     public Tuple getNext() throws IOException {
         Tuple t = loader.getNext();
         return t;
     }
-
+    
     @Override
     public void close() throws IOException {
     }
@@ -242,14 +242,14 @@ public class DefaultIndexableLoader exte
     @Override
     public void initialize(Configuration conf) throws IOException {
         // nothing to do
-
+        
     }
 
     @Override
     public InputFormat getInputFormat() throws IOException {
         throw new UnsupportedOperationException();
     }
-
+    
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         throw new UnsupportedOperationException();
@@ -264,7 +264,7 @@ public class DefaultIndexableLoader exte
     public void setLocation(String location, Job job) throws IOException {
         // nothing to do
     }
-
+    
     public void setIndexFile(String indexFile) {
         this.indexFile = indexFile;
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Fri Feb 24 03:34:37 2017
@@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -43,7 +42,7 @@ public class GFCross extends EvalFunc<Da
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private int parallelism = 0;
-    private Random r;
+    private Random r = new Random();
     private String crossKey;
 
     static private final int DEFAULT_PARALLELISM = 96;
@@ -70,14 +69,6 @@ public class GFCross extends EvalFunc<Da
                 if (parallelism < 0) {
                     throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
                 }
-                long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
-                long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
-                r = new Random(seed);
-            } else {
-                // Don't see a case where cfg can be null.
-                // But there is an existing testcase TestGFCross.testDefault
-                // Using constant generated from task_14738102975522_0001_r_000000 hashcode
-                r = new Random(-4235927512599300514L);
             }
 
             numInputs = (Integer)input.get(0);

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Fri Feb 24 03:34:37 2017
@@ -90,9 +90,7 @@ public class PoissonSampleLoader extends
             // number of tuples to be skipped
             Tuple t = loader.getNext();
             if(t == null) {
-                // since skipInterval is -1, no previous sample,
-                // and next sample is null -> the data set is empty
-                return null;
+                return createNumRowTuple(null);
             }
             long availRedMem = (long) ( totalMemory * heapPerc);
             // availRedMem = 155084396;

Modified: pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java Fri Feb 24 03:34:37 2017
@@ -57,8 +57,6 @@ public class NullableTuple extends PigNu
     public void readFields(DataInput in) throws IOException {
         boolean nullness = in.readBoolean();
         setNull(nullness);
-        // Free up the previous value for GC
-        mValue = null;
         if (!nullness) {
             mValue = bis.readTuple(in);
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java Fri Feb 24 03:34:37 2017
@@ -102,7 +102,7 @@ public class PigFile {
         if(oc.needsTaskCommit(tac)) {
             oc.commitTask(tac);
         }
-        oc.commitJob(jc);
+        HadoopShims.commitOrCleanup(oc, jc);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Fri Feb 24 03:34:37 2017
@@ -40,16 +40,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
  * This is wrapper Loader which wraps a real LoadFunc underneath and allows
- * to read a file completely starting a given split (indicated by a split index
+ * to read a file completely starting a given split (indicated by a split index 
  * which is used to look in the List<InputSplit> returned by the underlying
  * InputFormat's getSplits() method). So if the supplied split index is 0, this
  * loader will read the entire file. If it is non zero it will read the partial
  * file beginning from that split to the last split.
- *
+ * 
  * The call sequence to use this is:
  * 1) construct an object using the constructor
  * 2) Call getNext() in a loop till it returns null
@@ -60,50 +61,52 @@ public class ReadToEndLoader extends Loa
      * the wrapped LoadFunc which will do the actual reading
      */
     private LoadFunc wrappedLoadFunc;
-
+    
     /**
      * the Configuration object used to locate the input location - this will
      * be used to call {@link LoadFunc#setLocation(String, Job)} on
      * the wrappedLoadFunc
      */
     private Configuration conf;
-
+    
     /**
      * the input location string (typically input file/dir name )
      */
     private String inputLocation;
-
+      
     /**
      * If the splits to be read are not in increasing sequence of integers
      * this array can be used
      */
     private int[] toReadSplits = null;
-
+    
     /**
      * index into toReadSplits
      */
     private int toReadSplitsIdx = 0;
-
+    
     /**
      * the index of the split the loader is currently reading from
      */
     private int curSplitIndex;
-
+    
     /**
      * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)}
      */
     private List<InputSplit> inpSplits = null;
-
+    
     /**
      * underlying RecordReader
      */
     private RecordReader reader = null;
-
+    
     /**
      * underlying InputFormat
      */
     private InputFormat inputFormat = null;
-
+    
+    private PigContext pigContext;
+    
     private String udfContextSignature = null;
 
     /**
@@ -111,8 +114,8 @@ public class ReadToEndLoader extends Loa
      * @param conf
      * @param inputLocation
      * @param splitIndex
-     * @throws IOException
-     * @throws InterruptedException
+     * @throws IOException 
+     * @throws InterruptedException 
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex) throws IOException {
@@ -122,7 +125,17 @@ public class ReadToEndLoader extends Loa
         this.curSplitIndex = splitIndex;
         init();
     }
-
+    
+    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
+            String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
+        this.wrappedLoadFunc = wrappedLoadFunc;
+        this.inputLocation = inputLocation;
+        this.conf = conf;
+        this.curSplitIndex = splitIndex;
+        this.pigContext = pigContext;
+        init();
+    }
+    
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex, String signature) throws IOException {
         this.udfContextSignature = signature;
@@ -134,14 +147,14 @@ public class ReadToEndLoader extends Loa
     }
 
     /**
-     * This constructor takes an array of split indexes (toReadSplitIdxs) of the
+     * This constructor takes an array of split indexes (toReadSplitIdxs) of the 
      * splits to be read.
      * @param wrappedLoadFunc
      * @param conf
      * @param inputLocation
      * @param toReadSplitIdxs
-     * @throws IOException
-     * @throws InterruptedException
+     * @throws IOException 
+     * @throws InterruptedException 
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int[] toReadSplitIdxs) throws IOException {
@@ -153,21 +166,21 @@ public class ReadToEndLoader extends Loa
             toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE;
         init();
     }
-
+    
     @SuppressWarnings("unchecked")
     private void init() throws IOException {
-        if (conf != null) {
-            SchemaTupleBackend.initialize(conf, true);
+        if (conf != null && pigContext != null) {
+            SchemaTupleBackend.initialize(conf, pigContext, true);
         }
 
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
         conf = new Configuration(conf);
 
-        // let's initialize the wrappedLoadFunc
+        // let's initialize the wrappedLoadFunc 
         Job job = new Job(conf);
         wrappedLoadFunc.setUDFContextSignature(this.udfContextSignature);
-        wrappedLoadFunc.setLocation(inputLocation,
+        wrappedLoadFunc.setLocation(inputLocation, 
                 job);
         // The above setLocation call could write to the conf within
         // the job - get a hold of the modified conf
@@ -178,10 +191,10 @@ public class ReadToEndLoader extends Loa
                     new JobID()));
         } catch (InterruptedException e) {
             throw new IOException(e);
-        }
+        }        
     }
 
-    private boolean initializeReader() throws IOException,
+    private boolean initializeReader() throws IOException, 
     InterruptedException {
         // Close the previous reader first
         if(reader != null) {
@@ -193,14 +206,14 @@ public class ReadToEndLoader extends Loa
             return false;
         }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
-        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf,
+        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, 
                 new TaskAttemptID());
         reader = inputFormat.createRecordReader(curSplit, tAContext);
         reader.initialize(curSplit, tAContext);
         // create a dummy pigsplit - other than the actual split, the other
         // params are really not needed here where we are just reading the
         // input completely
-        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
+        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
                 new ArrayList<OperatorKey>(), -1);
         // Set the conf object so that if the wrappedLoadFunc uses it,
         // it won't be null
@@ -231,7 +244,7 @@ public class ReadToEndLoader extends Loa
             throw new IOException(e);
         }
     }
-
+    
     private Tuple getNextHelper() throws IOException, InterruptedException {
         Tuple t = null;
         while(initializeReader()) {
@@ -245,8 +258,8 @@ public class ReadToEndLoader extends Loa
         }
         return null;
     }
-
-
+    
+    
     /**
      * Updates curSplitIndex , just increment if splitIndexes is null,
      * else get next split in splitIndexes
@@ -318,7 +331,7 @@ public class ReadToEndLoader extends Loa
              ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
         }
     }
-
+    
     @Override
     public void setUDFContextSignature(String signature) {
         this.udfContextSignature = signature;

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java Fri Feb 24 03:34:37 2017
@@ -20,78 +20,43 @@ package org.apache.pig.impl.plan;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 
-/**
- * Generates IDs as long values in a thread safe manner. Each thread has its own generated IDs.
- */
 public class NodeIdGenerator {
 
-	/**
-	 * Holds a map of generated scoped-IDs per thread. Each map holds generated IDs per scope.
-	 */
-    private ThreadLocal<Map<String, AtomicLong>> scopeToIdMap
-        = new ThreadLocal<Map<String, AtomicLong>>() {
-            protected Map<String, AtomicLong> initialValue() {
-                return new HashMap<String,AtomicLong>();
-            }
-        };
-
-    /**
-     * Singleton instance.
-     */
-    private static final NodeIdGenerator theGenerator = new NodeIdGenerator();
-
-    /**
-     * Private default constructor to force singleton use-case of this class.
-     */
-    private NodeIdGenerator() {}
-
-    /**
-     * Returns the NodeIdGenerator singleton.
-     * @return
-     */
+    private Map<String, Long> scopeToIdMap;
+    private static NodeIdGenerator theGenerator = new NodeIdGenerator();
+
+    private NodeIdGenerator() {
+        scopeToIdMap = new HashMap<String, Long>();
+    }
+
     public static NodeIdGenerator getGenerator() {
         return theGenerator;
     }
 
-    /**
-     * Returns the next ID to be used for the current Thread.
-     * 
-     * @param scope
-     * @return
-     */
-    public long getNextNodeId(final String scope) {
-        // ThreadLocal usage protects us from having the same HashMap instance
-        // being used by several threads, so we can use it without synchronized
-        // blocks and still be thread-safe.
-        Map<String, AtomicLong> map = scopeToIdMap.get();
-
-        // the concurrent properties of the AtomicLong are useless here but
-        // since it cost less to use such an object rather than created a
-        // Long object instance each time we increment a counter ...
-        AtomicLong l = map.get(scope);
-        if ( l == null )
-            map.put( scope, l = new AtomicLong() );
-        return l.getAndIncrement();
+    public long getNextNodeId(String scope) {
+        Long val = scopeToIdMap.get(scope);
+
+        long nextId = 0;
+
+        if (val != null) {
+            nextId = val.longValue();
+        }
+
+        scopeToIdMap.put(scope, nextId + 1);
+
+        return nextId;
     }
 
-    /**
-     * Reset the given scope IDs to 0 for the current Thread.
-     * @param scope
-     */
     @VisibleForTesting
-    public static void reset(final String scope) {
-        theGenerator.scopeToIdMap.get().remove(scope);
+    public static void reset(String scope) {
+        theGenerator.scopeToIdMap.put(scope, 0L) ;
     }
 
-    /**
-     * Reset all scope IDs to 0 for the current Thread.
-     */
     @VisibleForTesting
     public static void reset() {
-        theGenerator.scopeToIdMap.remove();
+        theGenerator.scopeToIdMap.clear();
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java Fri Feb 24 03:34:37 2017
@@ -150,13 +150,12 @@ public class ExecutableManager {
 
         LOG.debug("Process exited with: " + exitCode);
         if (exitCode != SUCCESS) {
-            String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode;
-            LOG.error(errMsg);
-            Result res = new Result(POStatus.STATUS_ERR, errMsg);
-            sendOutput(poStream.getBinaryOutputQueue(), res);
+            LOG.error(command + " failed with exit status: "
+                    + exitCode);
         }
 
-        if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+
             // Trigger the outputHandler
             outputHandler.bindTo("", null, 0, -1);
 
@@ -179,18 +178,10 @@ public class ExecutableManager {
      * @param process the process to be killed
      * @throws IOException
      */
-    private void killProcess(Process process) {
+    private void killProcess(Process process) throws IOException {
         if (process != null) {
-            try {
-                inputHandler.close(process);
-            } catch (Exception e) {
-                LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage());
-            }
-            try {
-                outputHandler.close();
-            } catch (Exception e) {
-                LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage());
-            }
+            inputHandler.close(process);
+            outputHandler.close();
             process.destroy();
         }
     }
@@ -343,7 +334,7 @@ public class ExecutableManager {
                                 // we will only call close() here and not
                                 // worry about deducing whether the process died
                                 // normally or abnormally - if there was any real
-                                // issue we should see
+                                // issue the ProcessOutputThread should see
                                 // a non zero exit code from the process and send
                                 // a POStatus.STATUS_ERR back - what if we got
                                 // an IOException because there was only an issue with
@@ -353,6 +344,14 @@ public class ExecutableManager {
                                 return;
                             } else {
                                 // asynchronous case - then this is a real exception
+                                LOG.error("Exception while trying to write to stream binary's input", e);
+                                // send POStatus.STATUS_ERR to POStream to signal the error
+                                // Generally the ProcessOutputThread would do this but now
+                                // we should do it here since neither the process nor the
+                                // ProcessOutputThread will ever be spawned
+                                Result res = new Result(POStatus.STATUS_ERR,
+                                        "Exception while trying to write to stream binary's input" + e.getMessage());
+                                sendOutput(poStream.getBinaryOutputQueue(), res);
                                 throw e;
                             }
                         }
@@ -363,13 +362,13 @@ public class ExecutableManager {
             } catch (Throwable t) {
                 // Note that an error occurred
                 outerrThreadsError = t;
-                Result res = new Result(POStatus.STATUS_ERR,
-                                        "Error while reading from POStream and " +
-                                        "passing it to the streaming process:" + t.getMessage());
-                LOG.error("Error while reading from POStream and " +
-                          "passing it to the streaming process:", t);
-                sendOutput(poStream.getBinaryOutputQueue(), res);
-                killProcess(process);
+                LOG.error( "Error while reading from POStream and " +
+                           "passing it to the streaming process", t);
+                try {
+                    killProcess(process);
+                } catch (IOException ioe) {
+                    LOG.warn(ioe);
+                }
             }
         }
     }
@@ -453,7 +452,13 @@ public class ExecutableManager {
                 try {
                     exitCode = process.waitFor();
                 } catch (InterruptedException ie) {
-                    killProcess(process);
+                    try {
+                        killProcess(process);
+                    } catch (IOException e) {
+                        LOG.warn("Exception trying to kill process while processing null output " +
+                                "from binary", e);
+
+                    }
                     // signal error
                     String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
                             ie.getMessage();

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Fri Feb 24 03:34:37 2017
@@ -175,10 +175,8 @@ public abstract class OutputHandler {
      */
     public synchronized void close() throws IOException {
         if(!alreadyClosed) {
-            if( istream != null ) {
-                istream.close();
-                istream = null;
-            }
+            istream.close();
+            istream = null;
             alreadyClosed = true;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Fri Feb 24 03:34:37 2017
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
@@ -65,6 +66,7 @@ public class JarManager {
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
+        GUAVA(Multimaps.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;
@@ -206,8 +208,11 @@ public class JarManager {
     public static List<String> getDefaultJars() {
         List<String> defaultJars = new ArrayList<String>();
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+            if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
+                continue; //Skip
+            }
             String jar = findContainingJar(pkgToSend.getPkgClass());
-            if (jar != null && !defaultJars.contains(jar)) {
+            if (!defaultJars.contains(jar)) {
                 defaultJars.add(jar);
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri Feb 24 03:34:37 2017
@@ -51,9 +51,10 @@ public class SpillableMemoryManager impl
 
     private static final Log log = LogFactory.getLog(SpillableMemoryManager.class);
 
+    private static final int ONE_GB = 1024 * 1024 * 1024;
     private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 1024;
-    private static final float MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7f;
-    private static final float COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7f;
+    private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
+    private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
 
     private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
     // References to spillables with size
@@ -85,7 +86,7 @@ public class SpillableMemoryManager impl
 
     // fraction of the total heap used for the threshold to determine
     // if we want to perform an extra gc before the spill
-    private float extraGCThresholdFraction = 0.05f;
+    private double extraGCThresholdFraction = 0.05;
     private long extraGCSpillSizeThreshold  = 0L;
 
     private volatile boolean blockRegisterOnSpill = false;
@@ -141,7 +142,7 @@ public class SpillableMemoryManager impl
      * @param unusedMemoryThreshold
      *            Unused memory size below which we want to get notifications
      */
-    private void configureMemoryThresholds(float memoryThresholdFraction, float collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+    private void configureMemoryThresholds(double memoryThresholdFraction, double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
         long tenuredHeapSize = tenuredHeap.getUsage().getMax();
         memoryThresholdSize = (long)(tenuredHeapSize * memoryThresholdFraction);
         collectionThresholdSize = (long)(tenuredHeapSize * collectionMemoryThresholdFraction);
@@ -183,8 +184,8 @@ public class SpillableMemoryManager impl
 
         spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", spillFileSizeThreshold);
         gcActivationSize = conf.getLong("pig.spill.gc.activation.size", gcActivationSize);
-        float memoryThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
-        float collectionThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+        double memoryThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
+        double collectionThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
         long unusedMemoryThreshold = conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, UNUSED_MEMORY_THRESHOLD_DEFAULT);
         configureMemoryThresholds(memoryThresholdFraction, collectionThresholdFraction, unusedMemoryThreshold);
     }
@@ -198,7 +199,7 @@ public class SpillableMemoryManager impl
         // used - heapmax/2 + heapmax/4
         long toFree = 0L;
         if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
-            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call- Usage threshold "
@@ -210,7 +211,7 @@ public class SpillableMemoryManager impl
                 log.debug(msg);
             }
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
-            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call - Collection threshold "

Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Fri Feb 24 03:34:37 2017
@@ -48,7 +48,6 @@ import org.apache.hadoop.io.compress.BZi
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -94,10 +93,18 @@ public class Utils {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
 
-    public static boolean is64bitJVM() {
-        String arch = System.getProperties().getProperty("sun.arch.data.model",
-                System.getProperty("com.ibm.vm.bitmode"));
-        return arch != null && arch.equals("64");
+    public static boolean isHadoop23() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b0\\.23\\..+\\b"))
+            return true;
+        return false;
+    }
+
+    public static boolean isHadoop2() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b2\\.\\d+\\..+"))
+            return true;
+        return false;
     }
 
     /**
@@ -567,11 +574,6 @@ public class Utils {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }
 
-    public static boolean isLocal(Configuration conf) {
-        return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)
-                || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
-    }
-
     // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
     // Following code has been borrowed from Hadoop's Configuration#substituteVars
     private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
@@ -695,15 +697,4 @@ public class Utils {
             DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
         }
     }
-
-    /**
-     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
-     *
-     * @param hook code to execute during shutdown
-     * @param priority Priority over the  FileSystem.SHUTDOWN_HOOK_PRIORITY
-     */
-    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
-        ShutdownHookManager.get().addShutdownHook(hook,
-                FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
-    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Fri Feb 24 03:34:37 2017
@@ -118,8 +118,6 @@ public class AvroStorageDataConversionUt
         return ByteBuffer.wrap(((DataByteArray) o).get());
       case FIXED:
         return new GenericData.Fixed(s, ((DataByteArray) o).get());
-      case ENUM:
-        return new GenericData.EnumSymbol(s,o.toString());
       default:
         if (DataType.findType(o) == DataType.DATETIME) {
           return ((DateTime) o).getMillis();

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Fri Feb 24 03:34:37 2017
@@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -50,7 +49,6 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
-    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
@@ -66,9 +64,9 @@ public final class AvroTupleWrapper <T e
   }
 
   @Override
-  public void write(DataOutput out) throws IOException {
-      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
-      t.write(out);
+  public void write(final DataOutput o) throws IOException {
+    throw new IOException(
+        this.getClass().toString() + ".write called, but not implemented yet");
   }
 
   @SuppressWarnings("rawtypes")

Modified: pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java Fri Feb 24 03:34:37 2017
@@ -98,17 +98,13 @@ public abstract class FilterExtractor {
     public void visit() throws FrontendException {
         // we will visit the leaf and it will recursively walk the plan
         LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
-
-        // recursively traverse the tree bottom up
-        // checkPushdown returns KeyState which is pair of LogicalExpression
-        KeyState finale = null;
-        if (leaf instanceof BinaryExpression) {
-            finale = checkPushDown((BinaryExpression) leaf);
-        } else if (leaf instanceof UnaryExpression) {
-            finale = checkPushDown((UnaryExpression) leaf);
-        }
-
-        if (finale != null) {
+        // if the leaf is a unary operator it should be a FilterFunc in
+        // which case we don't try to extract partition filter conditions
+        if(leaf instanceof BinaryExpression) {
+            // recursively traverse the tree bottom up
+            // checkPushdown returns KeyState which is pair of LogicalExpression
+            BinaryExpression binExpr = (BinaryExpression)leaf;
+            KeyState finale = checkPushDown(binExpr);
             this.filterExpr = finale.filterExpr;
             this.pushdownExpr = getExpression(finale.pushdownExpr);
         }
@@ -282,22 +278,12 @@ public abstract class FilterExtractor {
             if (unaryExpr instanceof CastExpression) {
                 return checkPushDown(unaryExpr.getExpression());
             }
-            // For IsNull, the child may not be a supported expression, e.g. MapLookupExpression.
-            // For NotExpression, the child, C, is broken into expressions P and F such that C = P AND F
-            // Consequently, NOT C = NOT P OR NOT F, which can't be expressed as an AND so both must be
-            // pushed or both used as a filter.
-            // For both cases, this expr can be pushed if and only if the entire child can be.
-            if (unaryExpr instanceof IsNullExpression || unaryExpr instanceof NotExpression) {
-                KeyState childState = checkPushDown(unaryExpr.getExpression());
-                if (childState.filterExpr == null) {
-                    // only push down if the entire expression can be pushed
-                    state.pushdownExpr = unaryExpr;
-                    state.filterExpr = null;
-                } else {
-                    removeFromFilteredPlan(childState.filterExpr);
-                    state.filterExpr = addToFilterPlan(unaryExpr);
-                    state.pushdownExpr = null;
-                }
+            if (unaryExpr instanceof IsNullExpression) {
+                state.pushdownExpr = unaryExpr;
+                state.filterExpr = null;
+            } else if (unaryExpr instanceof NotExpression) {
+                state.pushdownExpr = unaryExpr;
+                state.filterExpr = null;
             } else {
                 state.filterExpr = addToFilterPlan(unaryExpr);
                 state.pushdownExpr = null;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Fri Feb 24 03:34:37 2017
@@ -323,7 +323,6 @@ public class ExpToPhyTranslationVisitor
     public void visit( CastExpression op ) throws FrontendException {
         POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
-        pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
 //        physOp.setAlias(op.getAlias());
         currentPlan.add(pCast);
 

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java Fri Feb 24 03:34:37 2017
@@ -95,8 +95,7 @@ public class MapLookupExpression extends
         LogicalFieldSchema predFS = successor.getFieldSchema();
         if (predFS!=null) {
             if (predFS.type==DataType.MAP && predFS.schema!=null) {
-                fieldSchema = predFS.schema.getField(0);
-                return fieldSchema;
+                return (predFS.schema.getField(0));
             }
             else {
                 fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Fri Feb 24 03:34:37 2017
@@ -37,7 +37,6 @@ public class LOGenerate extends LogicalR
      // to store uid in mUserDefinedSchema
      private List<LogicalSchema> mUserDefinedSchema = null;
      private List<LogicalSchema> outputPlanSchemas = null;
-     private List<LogicalSchema> expSchemas = null;
      // If LOGenerate generate new uid, cache it here.
      // This happens when expression plan does not have complete schema, however,
      // user give complete schema in ForEach statement in script
@@ -72,7 +71,6 @@ public class LOGenerate extends LogicalR
         
         schema = new LogicalSchema();
         outputPlanSchemas = new ArrayList<LogicalSchema>();
-        expSchemas = new ArrayList<LogicalSchema>();
         
         for(int i=0; i<outputPlans.size(); i++) {
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -95,17 +93,19 @@ public class LOGenerate extends LogicalR
                 fieldSchema = exp.getFieldSchema().deepCopy();
                 
                 expSchema = new LogicalSchema();
-                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
+                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
                     // if type is primitive, just add to schema
-                    if (fieldSchema != null)
+                    if (fieldSchema!=null)
                         expSchema.addField(fieldSchema);
+                    else
+                        expSchema = null;
                 } else {
-                    // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
+                    // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
                     if (fieldSchema.schema==null) {
                         expSchema = null;
                     }
                     else {
-                     // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
+                     // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
                         List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
                         if (flattenFlags[i]) {
                             if (fieldSchema.type == DataType.BAG) {
@@ -117,23 +117,13 @@ public class LOGenerate extends LogicalR
                                         fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                     }
                                 }
-                            } else if (fieldSchema.type == DataType.MAP) {
-                                //should only contain 1 schemafield for Map's value
-                                innerFieldSchemas = fieldSchema.schema.getFields();
-                                LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
-                                fsForValue.alias = fieldSchema.alias + "::value";
-
-                                LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
-                                        fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
-
-                                expSchema.addField(fsForKey);
                             } else { // DataType.TUPLE
                                 innerFieldSchemas = fieldSchema.schema.getFields();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                 }
                             }
-
+                            
                             for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
                                 expSchema.addField(fs);
                         }
@@ -147,7 +137,6 @@ public class LOGenerate extends LogicalR
             if (expSchema!=null && expSchema.size()==0)
                 expSchema = null;
             LogicalSchema planSchema = new LogicalSchema();
-            expSchemas.add(expSchema);
             if (mUserDefinedSchemaCopy!=null) {
                 LogicalSchema mergedSchema = new LogicalSchema();
                 // merge with userDefinedSchema
@@ -157,6 +146,12 @@ public class LOGenerate extends LogicalR
                         fs.stampFieldSchema();
                         mergedSchema.addField(new LogicalFieldSchema(fs));
                     }
+                    for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+                        if (fs.type == DataType.NULL){
+                            //this is the use case where a new alias has been specified by user
+                            fs.type = DataType.BYTEARRAY;
+                        }
+                    }
                 } else {
 
                     // Merge uid with the exp field schema
@@ -168,12 +163,8 @@ public class LOGenerate extends LogicalR
                     mergedSchema.mergeUid(expSchema);
 
                 }
-                for (LogicalFieldSchema fs : mergedSchema.getFields()) {
-                    if (fs.type==DataType.NULL) {
-                        fs.type = DataType.BYTEARRAY;
-                    }
+                for (LogicalFieldSchema fs : mergedSchema.getFields())
                     planSchema.addField(fs);
-                }
             } else {
                 // if any plan do not have schema, the whole LOGenerate do not have schema
                 if (expSchema==null) {
@@ -319,8 +310,4 @@ public class LOGenerate extends LogicalR
         super.resetSchema();
         outputPlanSchemas = null;
     }
-
-    public List<LogicalSchema> getExpSchemas() {
-        return expSchemas;
-    }
 }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java Fri Feb 24 03:34:37 2017
@@ -38,7 +38,6 @@ public class LOJoin extends LogicalRelat
      */
     public static enum JOINTYPE {
         HASH,    // Hash Join
-        BLOOM,   // Bloom Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE,   // Sort Merge Join

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Feb 24 03:34:37 2017
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor
 
             return;
         }
-        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
             POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
             POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
             currentPlan.add(fe);
@@ -1425,20 +1425,7 @@ public class LogToPhyTranslationVisitor
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
-            if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
-                if (innerFlags.length == 2) {
-                    if (innerFlags[0] == false && innerFlags[1] == false) {
-                        throw new LogicalToPhysicalTranslatorException(
-                                "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
-                                        ". Bloom join cannot be used with a FULL OUTER join.",
-                                1109,
-                                PigException.INPUT);
-                    }
-                }
-                poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
-            } else {
-                poPackage.getPkgr().setPackageType(PackageType.JOIN);
-            }
+            poPackage.getPkgr().setPackageType(PackageType.JOIN);
         }
         translateSoftLinks(loj);
     }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Feb 24 03:34:37 2017
@@ -48,7 +48,6 @@ import org.apache.pig.newplan.logical.vi
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
 import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
 import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
-import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
 import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
 import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -176,7 +175,6 @@ public class LogicalPlan extends BaseOpe
         new ColumnAliasConversionVisitor(this).visit();
         new SchemaAliasVisitor(this).visit();
         new ScalarVisitor(this, pigContext, scope).visit();
-        new ForEachUserSchemaVisitor(this).visit();
 
         // ImplicitSplitInsertVisitor has to be called before
         // DuplicateForEachColumnRewriteVisitor.  Detail at pig-1766
@@ -191,15 +189,6 @@ public class LogicalPlan extends BaseOpe
 
         new TypeCheckingRelVisitor( this, collector).visit();
 
-
-        new UnionOnSchemaSetter(this).visit();
-        new CastLineageSetter(this, collector).visit();
-        new ScalarVariableValidator(this).visit();
-        new StoreAliasSetter(this).visit();
-
-        // compute whether output data is sorted or not
-        new SortInfoSetter(this).visit();
-
         boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
 
         if(aggregateWarning) {
@@ -210,6 +199,14 @@ public class LogicalPlan extends BaseOpe
             }
         }
 
+        new UnionOnSchemaSetter(this).visit();
+        new CastLineageSetter(this, collector).visit();
+        new ScalarVariableValidator(this).visit();
+        new StoreAliasSetter(this).visit();
+
+        // compute whether output data is sorted or not
+        new SortInfoSetter(this).visit();
+
         if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) {
             // Validate input/output file
             new InputOutputFileValidatorVisitor(this, pigContext).visit();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Feb 24 03:34:37 2017
@@ -150,39 +150,6 @@ public class LogicalSchema {
             }
             return true;
         }
-
-        // Check if fs1 is equal to fs2 with regard to type
-        public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
-            if (fs1==null && fs2==null) {
-                return true;
-            }
-            if (fs1==null || fs2==null) {
-                return false;
-            }
-            if (fs1.type!=fs2.type) {
-                return false;
-            }
-            if (DataType.isComplex(fs1.type)) {
-                LogicalSchema s1 = fs1.schema;
-                LogicalSchema s2 = fs2.schema;
-                if (s1==null && s2==null) {
-                    return true;
-                }
-                if (fs1==null || fs2==null) {
-                    return false;
-                }
-                if (s1.size()!=s2.size()) {
-                    return false;
-                }
-                for (int i=0;i<s1.size();i++) {
-                    if (!typeMatch(s1.getField(i), s2.getField(i))) {
-                        return false;
-                    }
-                }
-            }
-            return true;
-        }
-
         /**
          * Adds the uid from FieldSchema argument to this FieldSchema
          * If the argument is null, it stamps this FieldSchema with uid
@@ -480,23 +447,7 @@ public class LogicalSchema {
             LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
             return mergedFS;
         }
-
-        public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException {
-            if (fs1.type == DataType.BYTEARRAY) {
-                return true;
-            } else if (fs2.type == DataType.BYTEARRAY) {
-                return true;
-            } else if (fs1.type == fs2.type) {
-                if (DataType.isComplex(fs1.type)) {
-                    return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema);
-                } else {
-                    return true;
-                }
-            } else {
-                return false;
-            }
-        }
-
+        
         /***
          * Old Pig field schema does not require a tuple schema inside a bag;
          * Now it is required to have that; this method is to fill the gap
@@ -819,24 +770,7 @@ public class LogicalSchema {
         }
         return mergedSchema;
     }
-
-    public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
-        if (s1 == null) {
-            return true;
-        } else if (s2 == null) {
-            return true;
-        } else if (s1.size() != s2.size()) {
-            return false;
-        } else {
-            for (int i=0;i<s1.size();i++) {
-                if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) {
-                    return false;
-                }
-            }
-            return true;
-        }
-    }
-
+    
     public String toString(boolean verbose) {
         StringBuilder str = new StringBuilder();
         

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java Fri Feb 24 03:34:37 2017
@@ -95,7 +95,7 @@ public class AddForEach extends WholePla
             }
             
             Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
-            if (outputUids==null || outputUids.size() == 0 )
+            if (outputUids==null)
                 return false;
             
             LogicalSchema schema = op.getSchema();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Fri Feb 24 03:34:37 2017
@@ -107,7 +107,7 @@ public class CastLineageSetter extends A
                 if(inLoadFunc == null){
                     String msg = "Cannot resolve load function to use for casting from " + 
                                 DataType.findTypeName(inType) + " to " +
-                                DataType.findTypeName(outType) + " at " + cast.getLocation() ;
+                                DataType.findTypeName(outType) + ". ";
                     msgCollector.collect(msg, MessageType.Warning,
                            PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
                 }else {

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Fri Feb 24 03:34:37 2017
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -730,44 +729,6 @@ public class LineageFindRelVisitor exten
             }
         }
 
-        @Override
-        public void visit(UserFuncExpression op) throws FrontendException {
-
-            if( op.getFieldSchema() == null ) {
-                return;
-            }
-
-            FuncSpec funcSpec = null;
-            Class loader = instantiateCaster(op.getFuncSpec());
-            List<LogicalExpression> arguments = op.getArguments();
-            if ( loader != null ) {
-                // if evalFunc.getLoadCaster() returns, simply use that.
-                funcSpec = op.getFuncSpec();
-            } else if (arguments.size() != 0 ) {
-                FuncSpec baseFuncSpec = null;
-                LogicalFieldSchema fs = arguments.get(0).getFieldSchema();
-                if ( fs != null ) {
-                    baseFuncSpec = uid2LoadFuncMap.get(fs.uid);
-                    if( baseFuncSpec != null ) {
-                        funcSpec = baseFuncSpec;
-                        for(int i = 1; i < arguments.size(); i++) {
-                            fs = arguments.get(i).getFieldSchema();
-                            if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) {
-                                funcSpec = null;
-                                break;
-                            }
-                        }
-                    }
-                }
-            }
-
-            if( funcSpec != null ) {
-                addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec);
-                // in case schema is nested, set funcSpec for all
-                setLoadFuncForUids(op.getFieldSchema().schema, funcSpec);
-            }
-        }
-
         /**
          * if there is a null constant under casts, return it
          * @param rel
@@ -809,8 +770,6 @@ public class LineageFindRelVisitor exten
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
-            } else if (obj instanceof EvalFunc) {
-                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
                                            2270, PigException.BUG );

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Fri Feb 24 03:34:37 2017
@@ -458,7 +458,6 @@ public class TypeCheckingExpVisitor exte
         collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
 
         CastExpression cast = new CastExpression(plan, arg, toFs);
-        cast.setLocation(node.getLocation());
         try {
             // disconnect cast and arg because the connection is already
             // added by cast constructor and insertBetween call is going
@@ -491,7 +490,7 @@ public class TypeCheckingExpVisitor exte
         byte outType = cast.getType();
         if(outType == DataType.BYTEARRAY && inType != outType) {
             int errCode = 1051;
-            String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray";
+            String msg = "Cannot cast to bytearray";
             msgCollector.collect(msg, MessageType.Error) ;
             throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
         }
@@ -608,7 +607,7 @@ public class TypeCheckingExpVisitor exte
             // Matching schemas if we're working with tuples/bags
             if (DataType.isSchemaType(lhsType)) {
                 try {
-                    if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
+                    if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){
                         int errCode = 1048;
                         String msg = "Two inputs of BinCond must have compatible schemas."
                             + " left hand side: " + binCond.getLhs().getFieldSchema()

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Fri Feb 24 03:34:37 2017
@@ -351,8 +351,7 @@ public class TypeCheckingRelVisitor exte
 
             if (outFieldSchema.type != fs.type) {
                 castNeededCounter++ ;
-                CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema);
-                castexp.setLocation(toOp.getLocation());
+                new CastExpression(genPlan, project, outFieldSchema);
             }
 
             generatePlans.add(genPlan) ;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Fri Feb 24 03:34:37 2017
@@ -21,7 +21,6 @@ package org.apache.pig.newplan.logical.v
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
@@ -111,20 +110,9 @@ public class UnionOnSchemaSetter extends
                 } else {
                     ProjectExpression projExpr = 
                         new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
-                    if( opSchema.getField( pos ).type != fs.type ) {
-                        if( fs.type != DataType.BYTEARRAY ) {
-                            CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs );
-                            castexpr.setLocation(union.getLocation());
-                        } else {
-                            int errCode = 1056;
-                            String msg = "Union of incompatible types not allowed. "
-                                         + "Cannot cast from "
-                                         + DataType.findTypeName(opSchema.getField( pos ).type)
-                                         + " to bytearray for '"
-                                         + opSchema.getField( pos ).alias
-                                         + "'. Please typecast to compatible types before union." ;
-                            throw new FrontendException(union, msg, errCode, PigException.INPUT) ;
-                        }
+                    if( fs.type != DataType.BYTEARRAY
+                        && opSchema.getField( pos ).type != fs.type ) {
+                        new CastExpression( exprPlan, projExpr, fs );
                     }
                     genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
                 }

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Fri Feb 24 03:34:37 2017
@@ -34,7 +34,6 @@ import org.antlr.runtime.RecognitionExce
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.NonFSLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -889,7 +888,7 @@ public class LogicalPlanBuilder {
             if (absolutePath == null) {
                 absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) );
 
-                if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) {
+                if (absolutePath!=null) {
                     QueryParserUtils.setHdfsServers( absolutePath, pigContext );
                 }
                 fileNameMap.put( fileNameKey, absolutePath );
@@ -1358,19 +1357,13 @@ public class LogicalPlanBuilder {
         return Long.parseLong( num );
     }
 
-    /**
-     * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object
-     */
     static BigInteger parseBigInteger(String s) {
-        String num = s.substring( 0, s.length() - 2 );
+        String num = s.substring( 0, s.length() - 1 );
         return new BigInteger( num );
     }
 
-    /**
-     * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object
-     */
     static BigDecimal parseBigDecimal(String s) {
-        String num = s.substring( 0, s.length() - 2 );
+        String num = s.substring( 0, s.length() - 1 );
         return new BigDecimal( num );
     }
 
@@ -1788,8 +1781,6 @@ public class LogicalPlanBuilder {
             return JOINTYPE.REPLICATED;
          } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
              return LOJoin.JOINTYPE.HASH;
-         } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
-             return LOJoin.JOINTYPE.BLOOM;
          } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
              return JOINTYPE.SKEWED;
          } else if (modifier.equalsIgnoreCase("merge")) {
@@ -1798,7 +1789,7 @@ public class LogicalPlanBuilder {
              return JOINTYPE.MERGESPARSE;
          } else {
              throw new ParserValidationException( intStream, loc,
-                      "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+                      "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
          }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Fri Feb 24 03:34:37 2017
@@ -168,9 +168,14 @@ class PigMacro {
 
             Map<String, String> paramVal = pc.getParamVal();
             for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) {
-                // overwrite=false since macro parameters should have precedence
-                // over commandline parameters (if keys overlap)
-                pc.processOrdLine(e.getKey(), e.getValue(), false);
+                if (paramVal.containsKey(e.getKey())) {
+                    throw new ParserException(
+                        "Macro contains argument or return value " + e.getKey() + " which conflicts " +
+                        "with a Pig parameter of the same name."
+                    );
+                } else {
+                    paramVal.put(e.getKey(), e.getValue());
+                }
             }
             
             ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc);
@@ -214,7 +219,6 @@ class PigMacro {
         try {
             result = parser.query();
         } catch (RecognitionException e) {
-            e.line += startLine -1;
             String msg = (fileName == null) ? parser.getErrorHeader(e)
                     : QueryParserUtils.generateErrorHeader(e, fileName);
             msg += " " + parser.getErrorMessage(e, parser.getTokenNames());
@@ -232,7 +236,7 @@ class PigMacro {
         if (!macroDefNodes.isEmpty()) {
             String fname = ((PigParserNode)ast).getFileName();
             String msg = getErrorMessage(fname, ast.getLine(),
-                    "Invalid macro definition", "macro '" + name
+                    "Invalide macro definition", "macro '" + name
                             + "' contains macro definition.\nmacro content: "
                             + body);
             throw new ParserException(msg);
@@ -269,7 +273,6 @@ class PigMacro {
         try {
             result2 = walker.query();
         } catch (RecognitionException e) {
-            e.line += startLine - 1;
             String msg = walker.getErrorHeader(e) + " "
                     + walker.getErrorMessage(e, walker.getTokenNames());
             String msg2 = getErrorMessage(file, line, "Failed to mask macro '"

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1784224&r1=1784223&r2=1784224&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Fri Feb 24 03:34:37 2017
@@ -889,8 +889,6 @@ scalar : INTEGER
        | LONGINTEGER
        | FLOATNUMBER
        | DOUBLENUMBER
-       | BIGINTEGERNUMBER
-       | BIGDECIMALNUMBER
        | QUOTEDSTRING
        | NULL
        | TRUE