You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [13/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Wed Feb 22 09:43:41 2017
@@ -51,10 +51,9 @@ public class SpillableMemoryManager impl
 
     private static final Log log = LogFactory.getLog(SpillableMemoryManager.class);
 
-    private static final int ONE_GB = 1024 * 1024 * 1024;
     private static final int UNUSED_MEMORY_THRESHOLD_DEFAULT = 350 * 1024 * 1024;
-    private static final double MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7;
-    private static final double COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7;
+    private static final float MEMORY_THRESHOLD_FRACTION_DEFAULT = 0.7f;
+    private static final float COLLECTION_THRESHOLD_FRACTION_DEFAULT = 0.7f;
 
     private LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
     // References to spillables with size
@@ -86,7 +85,7 @@ public class SpillableMemoryManager impl
 
     // fraction of the total heap used for the threshold to determine
     // if we want to perform an extra gc before the spill
-    private double extraGCThresholdFraction = 0.05;
+    private float extraGCThresholdFraction = 0.05f;
     private long extraGCSpillSizeThreshold  = 0L;
 
     private volatile boolean blockRegisterOnSpill = false;
@@ -142,7 +141,7 @@ public class SpillableMemoryManager impl
      * @param unusedMemoryThreshold
      *            Unused memory size below which we want to get notifications
      */
-    private void configureMemoryThresholds(double memoryThresholdFraction, double collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
+    private void configureMemoryThresholds(float memoryThresholdFraction, float collectionMemoryThresholdFraction, long unusedMemoryThreshold) {
         long tenuredHeapSize = tenuredHeap.getUsage().getMax();
         memoryThresholdSize = (long)(tenuredHeapSize * memoryThresholdFraction);
         collectionThresholdSize = (long)(tenuredHeapSize * collectionMemoryThresholdFraction);
@@ -184,8 +183,8 @@ public class SpillableMemoryManager impl
 
         spillFileSizeThreshold = conf.getLong("pig.spill.size.threshold", spillFileSizeThreshold);
         gcActivationSize = conf.getLong("pig.spill.gc.activation.size", gcActivationSize);
-        double memoryThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
-        double collectionThresholdFraction = conf.getDouble(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
+        float memoryThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_MEMORY_USAGE_THRESHOLD_FRACTION, MEMORY_THRESHOLD_FRACTION_DEFAULT);
+        float collectionThresholdFraction = conf.getFloat(PigConfiguration.PIG_SPILL_COLLECTION_THRESHOLD_FRACTION, COLLECTION_THRESHOLD_FRACTION_DEFAULT);
         long unusedMemoryThreshold = conf.getLong(PigConfiguration.PIG_SPILL_UNUSED_MEMORY_THRESHOLD_SIZE, UNUSED_MEMORY_THRESHOLD_DEFAULT);
         configureMemoryThresholds(memoryThresholdFraction, collectionThresholdFraction, unusedMemoryThreshold);
     }
@@ -199,7 +198,7 @@ public class SpillableMemoryManager impl
         // used - heapmax/2 + heapmax/4
         long toFree = 0L;
         if(n.getType().equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
-            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call- Usage threshold "
@@ -211,7 +210,7 @@ public class SpillableMemoryManager impl
                 log.debug(msg);
             }
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
-            toFree = info.getUsage().getUsed() - memoryThresholdSize + (long)(memoryThresholdSize * 0.5);
+            toFree = info.getUsage().getUsed() - collectionThresholdSize + (long)(collectionThresholdSize * 0.5);
 
             //log
             String msg = "memory handler call - Collection threshold "

Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Wed Feb 22 09:43:41 2017
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.compress.BZi
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -93,18 +94,10 @@ public class Utils {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
 
-    public static boolean isHadoop23() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b0\\.23\\..+\\b"))
-            return true;
-        return false;
-    }
-
-    public static boolean isHadoop2() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b2\\.\\d+\\..+"))
-            return true;
-        return false;
+    public static boolean is64bitJVM() {
+        String arch = System.getProperties().getProperty("sun.arch.data.model",
+                System.getProperty("com.ibm.vm.bitmode"));
+        return arch != null && arch.equals("64");
     }
 
     /**
@@ -574,6 +567,11 @@ public class Utils {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }
 
+    public static boolean isLocal(Configuration conf) {
+        return conf.getBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, false)
+                || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
+    }
+
     // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
     // Following code has been borrowed from Hadoop's Configuration#substituteVars
     private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
@@ -697,4 +695,15 @@ public class Utils {
             DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
         }
     }
+
+    /**
+     * Add shutdown hook that runs before the FileSystem cache shutdown happens.
+     *
+     * @param hook code to execute during shutdown
+     * @param priority Priority over the  FileSystem.SHUTDOWN_HOOK_PRIORITY
+     */
+    public static void addShutdownHookWithPriority(Runnable hook, int priority) {
+        ShutdownHookManager.get().addShutdownHook(hook,
+                FileSystem.SHUTDOWN_HOOK_PRIORITY + priority);
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroStorageDataConversionUtilities.java Wed Feb 22 09:43:41 2017
@@ -118,6 +118,8 @@ public class AvroStorageDataConversionUt
         return ByteBuffer.wrap(((DataByteArray) o).get());
       case FIXED:
         return new GenericData.Fixed(s, ((DataByteArray) o).get());
+      case ENUM:
+        return new GenericData.EnumSymbol(s,o.toString());
       default:
         if (DataType.findType(o) == DataType.DATETIME) {
           return ((DateTime) o).getMillis();

Modified: pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/avro/AvroTupleWrapper.java Wed Feb 22 09:43:41 2017
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -49,6 +50,7 @@ import java.util.Map;
 public final class AvroTupleWrapper <T extends IndexedRecord>
     implements Tuple {
     private static final Log LOG = LogFactory.getLog(AvroTupleWrapper.class);
+    private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
   /**
    * The Avro object wrapped in the pig Tuple.
@@ -64,9 +66,9 @@ public final class AvroTupleWrapper <T e
   }
 
   @Override
-  public void write(final DataOutput o) throws IOException {
-    throw new IOException(
-        this.getClass().toString() + ".write called, but not implemented yet");
+  public void write(DataOutput out) throws IOException {
+      Tuple t = mTupleFactory.newTupleNoCopy(getAll());
+      t.write(out);
   }
 
   @SuppressWarnings("rawtypes")

Modified: pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/FilterExtractor.java Wed Feb 22 09:43:41 2017
@@ -98,13 +98,17 @@ public abstract class FilterExtractor {
     public void visit() throws FrontendException {
         // we will visit the leaf and it will recursively walk the plan
         LogicalExpression leaf = (LogicalExpression)originalPlan.getSources().get( 0 );
-        // if the leaf is a unary operator it should be a FilterFunc in
-        // which case we don't try to extract partition filter conditions
-        if(leaf instanceof BinaryExpression) {
-            // recursively traverse the tree bottom up
-            // checkPushdown returns KeyState which is pair of LogicalExpression
-            BinaryExpression binExpr = (BinaryExpression)leaf;
-            KeyState finale = checkPushDown(binExpr);
+
+        // recursively traverse the tree bottom up
+        // checkPushdown returns KeyState which is pair of LogicalExpression
+        KeyState finale = null;
+        if (leaf instanceof BinaryExpression) {
+            finale = checkPushDown((BinaryExpression) leaf);
+        } else if (leaf instanceof UnaryExpression) {
+            finale = checkPushDown((UnaryExpression) leaf);
+        }
+
+        if (finale != null) {
             this.filterExpr = finale.filterExpr;
             this.pushdownExpr = getExpression(finale.pushdownExpr);
         }
@@ -278,12 +282,22 @@ public abstract class FilterExtractor {
             if (unaryExpr instanceof CastExpression) {
                 return checkPushDown(unaryExpr.getExpression());
             }
-            if (unaryExpr instanceof IsNullExpression) {
-                state.pushdownExpr = unaryExpr;
-                state.filterExpr = null;
-            } else if (unaryExpr instanceof NotExpression) {
-                state.pushdownExpr = unaryExpr;
-                state.filterExpr = null;
+            // For IsNull, the child may not be a supported expression, e.g. MapLookupExpression.
+            // For NotExpression, the child, C, is broken into expressions P and F such that C = P AND F
+            // Consequently, NOT C = NOT P OR NOT F, which can't be expressed as an AND so both must be
+            // pushed or both used as a filter.
+            // For both cases, this expr can be pushed if and only if the entire child can be.
+            if (unaryExpr instanceof IsNullExpression || unaryExpr instanceof NotExpression) {
+                KeyState childState = checkPushDown(unaryExpr.getExpression());
+                if (childState.filterExpr == null) {
+                    // only push down if the entire expression can be pushed
+                    state.pushdownExpr = unaryExpr;
+                    state.filterExpr = null;
+                } else {
+                    removeFromFilteredPlan(childState.filterExpr);
+                    state.filterExpr = addToFilterPlan(unaryExpr);
+                    state.pushdownExpr = null;
+                }
             } else {
                 state.filterExpr = addToFilterPlan(unaryExpr);
                 state.pushdownExpr = null;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017
@@ -323,6 +323,7 @@ public class ExpToPhyTranslationVisitor
     public void visit( CastExpression op ) throws FrontendException {
         POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
+        pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
 //        physOp.setAlias(op.getAlias());
         currentPlan.add(pCast);
 

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,8 @@ public class MapLookupExpression extends
         LogicalFieldSchema predFS = successor.getFieldSchema();
         if (predFS!=null) {
             if (predFS.type==DataType.MAP && predFS.schema!=null) {
-                return (predFS.schema.getField(0));
+                fieldSchema = predFS.schema.getField(0);
+                return fieldSchema;
             }
             else {
                 fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Wed Feb 22 09:43:41 2017
@@ -37,6 +37,7 @@ public class LOGenerate extends LogicalR
      // to store uid in mUserDefinedSchema
      private List<LogicalSchema> mUserDefinedSchema = null;
      private List<LogicalSchema> outputPlanSchemas = null;
+     private List<LogicalSchema> expSchemas = null;
      // If LOGenerate generate new uid, cache it here.
      // This happens when expression plan does not have complete schema, however,
      // user give complete schema in ForEach statement in script
@@ -71,6 +72,7 @@ public class LOGenerate extends LogicalR
         
         schema = new LogicalSchema();
         outputPlanSchemas = new ArrayList<LogicalSchema>();
+        expSchemas = new ArrayList<LogicalSchema>();
         
         for(int i=0; i<outputPlans.size(); i++) {
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
@@ -93,19 +95,17 @@ public class LOGenerate extends LogicalR
                 fieldSchema = exp.getFieldSchema().deepCopy();
                 
                 expSchema = new LogicalSchema();
-                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG)||!flattenFlags[i]) {
+                if ((fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG && fieldSchema.type != DataType.MAP) || !flattenFlags[i]) {
                     // if type is primitive, just add to schema
-                    if (fieldSchema!=null)
+                    if (fieldSchema != null)
                         expSchema.addField(fieldSchema);
-                    else
-                        expSchema = null;
                 } else {
-                    // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+                    // if bag/tuple/map don't have inner schema, after flatten, we don't have schema for the entire operator
                     if (fieldSchema.schema==null) {
                         expSchema = null;
                     }
                     else {
-                     // if we come here, we get a BAG/Tuple with flatten, extract inner schema of the tuple as expSchema
+                     // if we come here, we get a BAG/Tuple/Map with flatten, extract inner schema of the tuple as expSchema
                         List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
                         if (flattenFlags[i]) {
                             if (fieldSchema.type == DataType.BAG) {
@@ -117,13 +117,23 @@ public class LOGenerate extends LogicalR
                                         fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                     }
                                 }
+                            } else if (fieldSchema.type == DataType.MAP) {
+                                //should only contain 1 schemafield for Map's value
+                                innerFieldSchemas = fieldSchema.schema.getFields();
+                                LogicalSchema.LogicalFieldSchema fsForValue = innerFieldSchemas.get(0);
+                                fsForValue.alias = fieldSchema.alias + "::value";
+
+                                LogicalSchema.LogicalFieldSchema fsForKey = new LogicalFieldSchema(
+                                        fieldSchema.alias + "::key" , null, DataType.CHARARRAY, fieldSchema.uid);
+
+                                expSchema.addField(fsForKey);
                             } else { // DataType.TUPLE
                                 innerFieldSchemas = fieldSchema.schema.getFields();
                                 for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
                                     fs.alias = fs.alias == null ? null : fieldSchema.alias + "::" + fs.alias;
                                 }
                             }
-                            
+
                             for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
                                 expSchema.addField(fs);
                         }
@@ -137,6 +147,7 @@ public class LOGenerate extends LogicalR
             if (expSchema!=null && expSchema.size()==0)
                 expSchema = null;
             LogicalSchema planSchema = new LogicalSchema();
+            expSchemas.add(expSchema);
             if (mUserDefinedSchemaCopy!=null) {
                 LogicalSchema mergedSchema = new LogicalSchema();
                 // merge with userDefinedSchema
@@ -146,12 +157,6 @@ public class LOGenerate extends LogicalR
                         fs.stampFieldSchema();
                         mergedSchema.addField(new LogicalFieldSchema(fs));
                     }
-                    for (LogicalFieldSchema fs : mergedSchema.getFields()) {
-                        if (fs.type == DataType.NULL){
-                            //this is the use case where a new alias has been specified by user
-                            fs.type = DataType.BYTEARRAY;
-                        }
-                    }
                 } else {
 
                     // Merge uid with the exp field schema
@@ -163,8 +168,12 @@ public class LOGenerate extends LogicalR
                     mergedSchema.mergeUid(expSchema);
 
                 }
-                for (LogicalFieldSchema fs : mergedSchema.getFields())
+                for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+                    if (fs.type==DataType.NULL) {
+                        fs.type = DataType.BYTEARRAY;
+                    }
                     planSchema.addField(fs);
+                }
             } else {
                 // if any plan do not have schema, the whole LOGenerate do not have schema
                 if (expSchema==null) {
@@ -310,4 +319,8 @@ public class LOGenerate extends LogicalR
         super.resetSchema();
         outputPlanSchemas = null;
     }
+
+    public List<LogicalSchema> getExpSchemas() {
+        return expSchemas;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LOJoin.java Wed Feb 22 09:43:41 2017
@@ -38,6 +38,7 @@ public class LOJoin extends LogicalRelat
      */
     public static enum JOINTYPE {
         HASH,    // Hash Join
+        BLOOM,   // Bloom Join
         REPLICATED, // Fragment Replicated join
         SKEWED, // Skewed Join
         MERGE,   // Sort Merge Join

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Wed Feb 22 09:43:41 2017
@@ -1414,7 +1414,7 @@ public class LogToPhyTranslationVisitor
 
             return;
         }
-        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH || loj.getJoinType() == LOJoin.JOINTYPE.BLOOM){
             POPackage poPackage = compileToLR_GR_PackTrio(loj, loj.getCustomPartitioner(), innerFlags, loj.getExpressionPlans());
             POForEach fe = compileFE4Flattening(innerFlags,  scope, parallel, alias, location, inputs);
             currentPlan.add(fe);
@@ -1425,7 +1425,20 @@ public class LogToPhyTranslationVisitor
                         e.getErrorCode(),e.getErrorSource(),e);
             }
             logToPhyMap.put(loj, fe);
-            poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            if (loj.getJoinType() == LOJoin.JOINTYPE.BLOOM) {
+                if (innerFlags.length == 2) {
+                    if (innerFlags[0] == false && innerFlags[1] == false) {
+                        throw new LogicalToPhysicalTranslatorException(
+                                "Error at " + loj.getLocation() + " with alias "+ loj.getAlias() +
+                                        ". Bloom join cannot be used with a FULL OUTER join.",
+                                1109,
+                                PigException.INPUT);
+                    }
+                }
+                poPackage.getPkgr().setPackageType(PackageType.BLOOMJOIN);
+            } else {
+                poPackage.getPkgr().setPackageType(PackageType.JOIN);
+            }
         }
         translateSoftLinks(loj);
     }

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Wed Feb 22 09:43:41 2017
@@ -48,6 +48,7 @@ import org.apache.pig.newplan.logical.vi
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
 import org.apache.pig.newplan.logical.visitor.DanglingNestedNodeRemover;
 import org.apache.pig.newplan.logical.visitor.DuplicateForEachColumnRewriteVisitor;
+import org.apache.pig.newplan.logical.visitor.ForEachUserSchemaVisitor;
 import org.apache.pig.newplan.logical.visitor.ImplicitSplitInsertVisitor;
 import org.apache.pig.newplan.logical.visitor.InputOutputFileValidatorVisitor;
 import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
@@ -175,6 +176,7 @@ public class LogicalPlan extends BaseOpe
         new ColumnAliasConversionVisitor(this).visit();
         new SchemaAliasVisitor(this).visit();
         new ScalarVisitor(this, pigContext, scope).visit();
+        new ForEachUserSchemaVisitor(this).visit();
 
         // ImplicitSplitInsertVisitor has to be called before
         // DuplicateForEachColumnRewriteVisitor.  Detail at pig-1766
@@ -189,6 +191,15 @@ public class LogicalPlan extends BaseOpe
 
         new TypeCheckingRelVisitor( this, collector).visit();
 
+
+        new UnionOnSchemaSetter(this).visit();
+        new CastLineageSetter(this, collector).visit();
+        new ScalarVariableValidator(this).visit();
+        new StoreAliasSetter(this).visit();
+
+        // compute whether output data is sorted or not
+        new SortInfoSetter(this).visit();
+
         boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
 
         if(aggregateWarning) {
@@ -199,14 +210,6 @@ public class LogicalPlan extends BaseOpe
             }
         }
 
-        new UnionOnSchemaSetter(this).visit();
-        new CastLineageSetter(this, collector).visit();
-        new ScalarVariableValidator(this).visit();
-        new StoreAliasSetter(this).visit();
-
-        // compute whether output data is sorted or not
-        new SortInfoSetter(this).visit();
-
         if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) {
             // Validate input/output file
             new InputOutputFileValidatorVisitor(this, pigContext).visit();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Wed Feb 22 09:43:41 2017
@@ -150,6 +150,39 @@ public class LogicalSchema {
             }
             return true;
         }
+
+        // Check if fs1 is equal to fs2 with regard to type
+        public static boolean typeMatch(LogicalFieldSchema fs1, LogicalFieldSchema fs2) {
+            if (fs1==null && fs2==null) {
+                return true;
+            }
+            if (fs1==null || fs2==null) {
+                return false;
+            }
+            if (fs1.type!=fs2.type) {
+                return false;
+            }
+            if (DataType.isComplex(fs1.type)) {
+                LogicalSchema s1 = fs1.schema;
+                LogicalSchema s2 = fs2.schema;
+                if (s1==null && s2==null) {
+                    return true;
+                }
+                if (fs1==null || fs2==null) {
+                    return false;
+                }
+                if (s1.size()!=s2.size()) {
+                    return false;
+                }
+                for (int i=0;i<s1.size();i++) {
+                    if (!typeMatch(s1.getField(i), s2.getField(i))) {
+                        return false;
+                    }
+                }
+            }
+            return true;
+        }
+
         /**
          * Adds the uid from FieldSchema argument to this FieldSchema
          * If the argument is null, it stamps this FieldSchema with uid
@@ -447,7 +480,23 @@ public class LogicalSchema {
             LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
             return mergedFS;
         }
-        
+
+        public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException {
+            if (fs1.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs2.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs1.type == fs2.type) {
+                if (DataType.isComplex(fs1.type)) {
+                    return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema);
+                } else {
+                    return true;
+                }
+            } else {
+                return false;
+            }
+        }
+
         /***
          * Old Pig field schema does not require a tuple schema inside a bag;
          * Now it is required to have that; this method is to fill the gap
@@ -770,7 +819,24 @@ public class LogicalSchema {
         }
         return mergedSchema;
     }
-    
+
+    public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+        if (s1 == null) {
+            return true;
+        } else if (s2 == null) {
+            return true;
+        } else if (s1.size() != s2.size()) {
+            return false;
+        } else {
+            for (int i=0;i<s1.size();i++) {
+                if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
     public String toString(boolean verbose) {
         StringBuilder str = new StringBuilder();
         

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/AddForEach.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class AddForEach extends WholePla
             }
             
             Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
-            if (outputUids==null)
+            if (outputUids==null || outputUids.size() == 0 )
                 return false;
             
             LogicalSchema schema = op.getSchema();

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Wed Feb 22 09:43:41 2017
@@ -107,7 +107,7 @@ public class CastLineageSetter extends A
                 if(inLoadFunc == null){
                     String msg = "Cannot resolve load function to use for casting from " + 
                                 DataType.findTypeName(inType) + " to " +
-                                DataType.findTypeName(outType) + ". ";
+                                DataType.findTypeName(outType) + " at " + cast.getLocation() ;
                     msgCollector.collect(msg, MessageType.Warning,
                            PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
                 }else {

Added: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java (added)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/ForEachUserSchemaVisitor.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+public class ForEachUserSchemaVisitor extends LogicalRelationalNodesVisitor {
+    public ForEachUserSchemaVisitor(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    private static LogicalSchema replaceNullByteArraySchema(
+                         LogicalSchema originalSchema,
+                         LogicalSchema userSchema) throws FrontendException {
+        if( originalSchema == null && userSchema == null ) {
+            return null;
+        } else if ( originalSchema == null ) {
+            return userSchema.deepCopy();
+        } else if ( userSchema == null ) {
+            return originalSchema.deepCopy();
+        }
+
+        LogicalSchema replacedSchema = new LogicalSchema();
+        for (int i=0;i<originalSchema.size();i++) {
+            LogicalFieldSchema replacedFS = replaceNullByteArrayFieldSchema(originalSchema.getField(i), userSchema.getField(i));
+            replacedSchema.addField(replacedFS);
+        }
+        return replacedSchema;
+    }
+
+    private static LogicalFieldSchema replaceNullByteArrayFieldSchema(
+                         LogicalFieldSchema originalFS,
+                         LogicalFieldSchema userFS) throws FrontendException {
+        if( originalFS == null && userFS == null ) {
+            return null;
+        } else if ( originalFS == null ) {
+            return userFS.deepCopy();
+        } else if ( userFS == null ) {
+            return originalFS.deepCopy();
+        }
+        if ( originalFS.type==DataType.NULL
+            || originalFS.type==DataType.BYTEARRAY ) {
+            return userFS.deepCopy();
+        } else if ( userFS.type==DataType.NULL
+            || userFS.type==DataType.BYTEARRAY ) {
+            // Use originalFS schema but keep the alias from userFS
+            return new LogicalFieldSchema(userFS.alias, originalFS.schema,  originalFS.type);
+        }
+
+        if ( !DataType.isSchemaType(originalFS.type) ) {
+            return userFS.deepCopy();
+        } else {
+            LogicalSchema replacedSchema = replaceNullByteArraySchema(originalFS.schema, userFS.schema);
+            return new LogicalFieldSchema(userFS.alias, replacedSchema, userFS.type);
+        }
+    }
+
+    private static boolean hasOnlyNullOrByteArraySchema (LogicalFieldSchema fs) {
+        if( DataType.isSchemaType(fs.type) ) {
+            if( fs.schema != null ) {
+                for (LogicalFieldSchema sub_fs : fs.schema.getFields() ) {
+                    if( !hasOnlyNullOrByteArraySchema(sub_fs)  ) {
+                        return false;
+                    }
+                }
+            }
+        } else if( fs.type != DataType.NULL && fs.type != DataType.BYTEARRAY )  {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException {
+        LOGenerate generate = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+        List<LogicalSchema> mExpSchemas = generate.getExpSchemas();
+        List<LogicalSchema> mUserDefinedSchemas = generate.getUserDefinedSchema();
+
+        // Skip if no way to figure out schema (usually both expression schema and
+        // user defined schema are null)
+        if (foreach.getSchema()==null) {
+            return;
+        }
+
+        if (mUserDefinedSchemas==null) {
+            return;
+        }
+
+        boolean hasUserDefinedSchema = false;
+        for (LogicalSchema mUserDefinedSchema : mUserDefinedSchemas) {
+            if (mUserDefinedSchema!=null) {
+                hasUserDefinedSchema = true;
+                break;
+            }
+        }
+
+        if (!hasUserDefinedSchema) {
+            return;
+        }
+
+        if (mExpSchemas.size()!=mUserDefinedSchemas.size()) {
+            throw new FrontendException("Size mismatch: Get " + mExpSchemas.size() +
+                    " mExpSchemas, but " + mUserDefinedSchemas.size() + " mUserDefinedSchemas",
+                    0, generate.getLocation());
+        }
+
+        LogicalPlan innerPlan = new LogicalPlan();
+        LOForEach casterForEach = new LOForEach(plan);
+        casterForEach.setInnerPlan(innerPlan);
+        casterForEach.setAlias(foreach.getAlias());
+
+        List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+        LOGenerate gen = new LOGenerate(innerPlan, exps, null);
+        innerPlan.add(gen);
+
+        int index = 0;
+        boolean needCast = false;
+        for(int i=0;i<mExpSchemas.size();i++) {
+            LogicalSchema mExpSchema = mExpSchemas.get(i);
+            LogicalSchema mUserDefinedSchema = mUserDefinedSchemas.get(i);
+
+            // Use user defined schema to cast, this is the prevailing use case
+            if (mExpSchema==null) {
+                for (LogicalFieldSchema fs : mUserDefinedSchema.getFields()) {
+                    if (hasOnlyNullOrByteArraySchema(fs)) {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    } else {
+                        addToExps(casterForEach, innerPlan, gen, exps, index, true, fs);
+                        needCast = true;
+                    }
+                    index++;
+                }
+                continue;
+            }
+
+            // No user defined schema, no need to cast
+            if (mUserDefinedSchema==null) {
+                for (int j=0;j<mExpSchema.size();j++) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                    index++;
+                }
+                continue;
+            }
+
+            // Expression has schema, but user also define schema, need cast only
+            // when there is a mismatch
+            if (mExpSchema.size()!=mUserDefinedSchema.size()) {
+                throw new FrontendException("Size mismatch: Cannot cast " + mExpSchema.size() +
+                        " fields to " + mUserDefinedSchema.size(), 0, foreach.getLocation());
+            }
+
+            LogicalSchema replacedSchema = replaceNullByteArraySchema(mExpSchema,mUserDefinedSchema);
+            for (int j=0;j<mExpSchema.size();j++) {
+                LogicalFieldSchema mExpFieldSchema = mExpSchema.getField(j);
+                LogicalFieldSchema mUserDefinedFieldSchema = replacedSchema.getField(j);
+
+                if (hasOnlyNullOrByteArraySchema(mUserDefinedFieldSchema) ||
+                    LogicalFieldSchema.typeMatch(mExpFieldSchema, mUserDefinedFieldSchema)) {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, false, null);
+                } else {
+                    addToExps(casterForEach, innerPlan, gen, exps, index, true, mUserDefinedFieldSchema);
+                    needCast = true;
+                }
+                index++;
+            }
+        }
+
+        gen.setFlattenFlags(new boolean[index]);
+        if (needCast) {
+            // Insert the casterForEach into the plan and patch up the plan.
+            List <Operator> successorOps = plan.getSuccessors(foreach);
+            if (successorOps != null && successorOps.size() > 0){
+                Operator next = plan.getSuccessors(foreach).get(0);
+                plan.insertBetween(foreach, casterForEach, next);
+            }else{
+                plan.add(casterForEach);
+                plan.connect(foreach,casterForEach);
+            }
+
+            // Since the explict cast is now inserted after the original foreach,
+            // throwing away the user defined "types" but keeping the user
+            // defined names from the original foreach.
+            // 'generate' (LOGenerate) still holds the reference to this
+            // mUserDefinedSchemas
+            for( LogicalSchema mUserDefinedSchema : mUserDefinedSchemas ) {
+                resetTypeToNull( mUserDefinedSchema );
+            }
+        }
+    }
+
+    private void resetTypeToNull (LogicalSchema s1) {
+        if( s1 != null ) {
+            for (LogicalFieldSchema fs : s1.getFields()) {
+                if( DataType.isSchemaType(fs.type) ) {
+                    resetTypeToNull(fs.schema);
+                } else {
+                    fs.type = DataType.NULL;
+                }
+            }
+        }
+    }
+
+    private void addToExps(LOForEach casterForEach, LogicalPlan innerPlan, LOGenerate gen,
+            List<LogicalExpressionPlan> exps, int index, boolean needCaster, LogicalFieldSchema fs) {
+
+        LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, casterForEach, index);
+        innerPlan.add(innerLoad);
+        innerPlan.connect(innerLoad, gen);
+
+        LogicalExpressionPlan exp = new LogicalExpressionPlan();
+
+        ProjectExpression prj = new ProjectExpression(exp, index, 0, gen);
+        exp.add(prj);
+
+        if (needCaster) {
+            CastExpression cast = new CastExpression(exp, prj, new LogicalSchema.LogicalFieldSchema(fs));
+            exp.add(cast);
+        }
+        exps.add(exp);
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Wed Feb 22 09:43:41 2017
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.pig.EvalFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.LoadFunc;
@@ -729,6 +730,44 @@ public class LineageFindRelVisitor exten
             }
         }
 
+        @Override
+        public void visit(UserFuncExpression op) throws FrontendException {
+
+            if( op.getFieldSchema() == null ) {
+                return;
+            }
+
+            FuncSpec funcSpec = null;
+            Class loader = instantiateCaster(op.getFuncSpec());
+            List<LogicalExpression> arguments = op.getArguments();
+            if ( loader != null ) {
+                // if evalFunc.getLoadCaster() returns, simply use that.
+                funcSpec = op.getFuncSpec();
+            } else if (arguments.size() != 0 ) {
+                FuncSpec baseFuncSpec = null;
+                LogicalFieldSchema fs = arguments.get(0).getFieldSchema();
+                if ( fs != null ) {
+                    baseFuncSpec = uid2LoadFuncMap.get(fs.uid);
+                    if( baseFuncSpec != null ) {
+                        funcSpec = baseFuncSpec;
+                        for(int i = 1; i < arguments.size(); i++) {
+                            fs = arguments.get(i).getFieldSchema();
+                            if( fs == null || !haveIdenticalCasters(baseFuncSpec, uid2LoadFuncMap.get(fs.uid)) ) {
+                                funcSpec = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+
+            if( funcSpec != null ) {
+                addUidLoadFuncToMap(op.getFieldSchema().uid, funcSpec);
+                // in case schema is nested, set funcSpec for all
+                setLoadFuncForUids(op.getFieldSchema().schema, funcSpec);
+            }
+        }
+
         /**
          * if there is a null constant under casts, return it
          * @param rel
@@ -770,6 +809,8 @@ public class LineageFindRelVisitor exten
                 caster = ((LoadFunc)obj).getLoadCaster();
             } else if (obj instanceof StreamToPig) {
                 caster = ((StreamToPig)obj).getLoadCaster();
+            } else if (obj instanceof EvalFunc) {
+                caster = ((EvalFunc)obj).getLoadCaster();
             } else {
                 throw new VisitorException("Invalid class type " + funcSpec.getClassName(),
                                            2270, PigException.BUG );

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java Wed Feb 22 09:43:41 2017
@@ -458,6 +458,7 @@ public class TypeCheckingExpVisitor exte
         collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
 
         CastExpression cast = new CastExpression(plan, arg, toFs);
+        cast.setLocation(node.getLocation());
         try {
             // disconnect cast and arg because the connection is already
             // added by cast constructor and insertBetween call is going
@@ -490,7 +491,7 @@ public class TypeCheckingExpVisitor exte
         byte outType = cast.getType();
         if(outType == DataType.BYTEARRAY && inType != outType) {
             int errCode = 1051;
-            String msg = "Cannot cast to bytearray";
+            String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray";
             msgCollector.collect(msg, MessageType.Error) ;
             throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
         }
@@ -607,7 +608,7 @@ public class TypeCheckingExpVisitor exte
             // Matching schemas if we're working with tuples/bags
             if (DataType.isSchemaType(lhsType)) {
                 try {
-                    if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){
+                    if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
                         int errCode = 1048;
                         String msg = "Two inputs of BinCond must have compatible schemas."
                             + " left hand side: " + binCond.getLhs().getFieldSchema()

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Wed Feb 22 09:43:41 2017
@@ -351,7 +351,8 @@ public class TypeCheckingRelVisitor exte
 
             if (outFieldSchema.type != fs.type) {
                 castNeededCounter++ ;
-                new CastExpression(genPlan, project, outFieldSchema);
+                CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema);
+                castexp.setLocation(toOp.getLocation());
             }
 
             generatePlans.add(genPlan) ;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java Wed Feb 22 09:43:41 2017
@@ -21,6 +21,7 @@ package org.apache.pig.newplan.logical.v
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
@@ -110,9 +111,20 @@ public class UnionOnSchemaSetter extends
                 } else {
                     ProjectExpression projExpr = 
                         new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
-                    if( fs.type != DataType.BYTEARRAY
-                        && opSchema.getField( pos ).type != fs.type ) {
-                        new CastExpression( exprPlan, projExpr, fs );
+                    if( opSchema.getField( pos ).type != fs.type ) {
+                        if( fs.type != DataType.BYTEARRAY ) {
+                            CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs );
+                            castexpr.setLocation(union.getLocation());
+                        } else {
+                            int errCode = 1056;
+                            String msg = "Union of incompatible types not allowed. "
+                                         + "Cannot cast from "
+                                         + DataType.findTypeName(opSchema.getField( pos ).type)
+                                         + " to bytearray for '"
+                                         + opSchema.getField( pos ).alias
+                                         + "'. Please typecast to compatible types before union." ;
+                            throw new FrontendException(union, msg, errCode, PigException.INPUT) ;
+                        }
                     }
                     genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
                 }

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Wed Feb 22 09:43:41 2017
@@ -34,6 +34,7 @@ import org.antlr.runtime.RecognitionExce
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.NonFSLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -888,7 +889,7 @@ public class LogicalPlanBuilder {
             if (absolutePath == null) {
                 absolutePath = loFunc.relativeToAbsolutePath( filename, QueryParserUtils.getCurrentDir( pigContext ) );
 
-                if (absolutePath!=null) {
+                if (absolutePath!=null && !(loFunc instanceof NonFSLoadFunc)) {
                     QueryParserUtils.setHdfsServers( absolutePath, pigContext );
                 }
                 fileNameMap.put( fileNameKey, absolutePath );
@@ -1357,13 +1358,19 @@ public class LogicalPlanBuilder {
         return Long.parseLong( num );
     }
 
+    /**
+     * Parse big integer formatted string (e.g. "1234567890123BI") into BigInteger object
+     */
     static BigInteger parseBigInteger(String s) {
-        String num = s.substring( 0, s.length() - 1 );
+        String num = s.substring( 0, s.length() - 2 );
         return new BigInteger( num );
     }
 
+    /**
+     * Parse big decimal formatted string (e.g. "123456.7890123BD") into BigDecimal object
+     */
     static BigDecimal parseBigDecimal(String s) {
-        String num = s.substring( 0, s.length() - 1 );
+        String num = s.substring( 0, s.length() - 2 );
         return new BigDecimal( num );
     }
 
@@ -1781,6 +1788,8 @@ public class LogicalPlanBuilder {
             return JOINTYPE.REPLICATED;
          } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
              return LOJoin.JOINTYPE.HASH;
+         } else if( modifier.equalsIgnoreCase( "bloom" ) ) {
+             return LOJoin.JOINTYPE.BLOOM;
          } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
              return JOINTYPE.SKEWED;
          } else if (modifier.equalsIgnoreCase("merge")) {
@@ -1789,7 +1798,7 @@ public class LogicalPlanBuilder {
              return JOINTYPE.MERGESPARSE;
          } else {
              throw new ParserValidationException( intStream, loc,
-                      "Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
+                      "Only REPL, REPLICATED, HASH, BLOOM, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
          }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/parser/PigMacro.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/PigMacro.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/PigMacro.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/PigMacro.java Wed Feb 22 09:43:41 2017
@@ -168,14 +168,9 @@ class PigMacro {
 
             Map<String, String> paramVal = pc.getParamVal();
             for (Map.Entry<String, String> e : pigContext.getParamVal().entrySet()) {
-                if (paramVal.containsKey(e.getKey())) {
-                    throw new ParserException(
-                        "Macro contains argument or return value " + e.getKey() + " which conflicts " +
-                        "with a Pig parameter of the same name."
-                    );
-                } else {
-                    paramVal.put(e.getKey(), e.getValue());
-                }
+                // overwrite=false since macro parameters should have precedence
+                // over commandline parameters (if keys overlap)
+                pc.processOrdLine(e.getKey(), e.getValue(), false);
             }
             
             ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(pc);
@@ -219,6 +214,7 @@ class PigMacro {
         try {
             result = parser.query();
         } catch (RecognitionException e) {
+            e.line += startLine -1;
             String msg = (fileName == null) ? parser.getErrorHeader(e)
                     : QueryParserUtils.generateErrorHeader(e, fileName);
             msg += " " + parser.getErrorMessage(e, parser.getTokenNames());
@@ -236,7 +232,7 @@ class PigMacro {
         if (!macroDefNodes.isEmpty()) {
             String fname = ((PigParserNode)ast).getFileName();
             String msg = getErrorMessage(fname, ast.getLine(),
-                    "Invalide macro definition", "macro '" + name
+                    "Invalid macro definition", "macro '" + name
                             + "' contains macro definition.\nmacro content: "
                             + body);
             throw new ParserException(msg);
@@ -273,6 +269,7 @@ class PigMacro {
         try {
             result2 = walker.query();
         } catch (RecognitionException e) {
+            e.line += startLine - 1;
             String msg = walker.getErrorHeader(e) + " "
                     + walker.getErrorMessage(e, walker.getTokenNames());
             String msg2 = getErrorMessage(file, line, "Failed to mask macro '"

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Wed Feb 22 09:43:41 2017
@@ -889,6 +889,8 @@ scalar : INTEGER
        | LONGINTEGER
        | FLOATNUMBER
        | DOUBLENUMBER
+       | BIGINTEGERNUMBER
+       | BIGDECIMALNUMBER
        | QUOTEDSTRING
        | NULL
        | TRUE

Modified: pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/RegisterResolver.java Wed Feb 22 09:43:41 2017
@@ -23,6 +23,10 @@ import java.net.URISyntaxException;
 
 import org.apache.pig.PigServer;
 import org.apache.pig.tools.DownloadResolver;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.hadoop.fs.Path;
 
 public class RegisterResolver {
 
@@ -66,15 +70,24 @@ public class RegisterResolver {
         String scheme = uri.getScheme();
         if (scheme != null) {
             scheme = scheme.toLowerCase();
+            if (scheme.equals("ivy")) {
+                DownloadResolver downloadResolver = DownloadResolver.getInstance();
+                return downloadResolver.downloadArtifact(uri, pigServer);
+            }
+            if (!hasFileSystemImpl(uri)) {
+                throw new ParserException("Invalid Scheme: " + uri.getScheme());
+            }
         }
-        if (scheme == null || scheme.equals("file") || scheme.equals("hdfs")) {
-            return new URI[] { uri };
-        } else if (scheme.equals("ivy")) {
-            DownloadResolver downloadResolver = DownloadResolver.getInstance();
-            return downloadResolver.downloadArtifact(uri, pigServer);
-        } else {
-            throw new ParserException("Invalid Scheme: " + uri.getScheme());
-        }
+        return new URI[] { uri };
+    }
+   
+    /**
+     * @param uri
+     * @return True if the uri has valid file system implementation
+     */ 
+    private boolean hasFileSystemImpl(URI uri) {
+      Configuration conf = ConfigurationUtil.toConfiguration(pigServer.getPigContext().getProperties(), true);
+      return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/SourceLocation.java Wed Feb 22 09:43:41 2017
@@ -75,12 +75,11 @@ public class SourceLocation {
         if (node != null) {
             InvocationPoint pt = node.getNextInvocationPoint();
             while (pt != null) {
-                sb.append("\n");
                 sb.append("at expanding macro '" + pt.getMacro() + "' ("
                         + pt.getFile() + ":" + pt.getLine() + ")");
                 pt = node.getNextInvocationPoint();
+                sb.append("\n");
             }
-            sb.append("\n");
         }
         sb.append( "<" );
         if( file != null && !file.isEmpty() )

Modified: pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java (original)
+++ pig/branches/spark/src/org/apache/pig/pen/LocalMapReduceSimulator.java Wed Feb 22 09:43:41 2017
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.TaskID;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -35,6 +36,7 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapBase;
@@ -75,9 +77,9 @@ import org.apache.pig.pen.util.LineageTr
  *
  */
 public class LocalMapReduceSimulator {
-    
+
     private MapReduceLauncher launcher = new MapReduceLauncher();
-    
+
     private Map<PhysicalOperator, PhysicalOperator> phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();;
 
     @SuppressWarnings("unchecked")
@@ -88,12 +90,12 @@ public class LocalMapReduceSimulator {
                               PigContext pc) throws PigException, IOException, InterruptedException {
         phyToMRMap.clear();
         MROperPlan mrp = launcher.compile(php, pc);
-                
+
         ConfigurationValidator.validatePigProperties(pc.getProperties());
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
+
         JobControl jc;
         int numMRJobsCompl = 0;
         DataBag input;
@@ -106,6 +108,8 @@ public class LocalMapReduceSimulator {
         boolean needFileInput;
         final ArrayList<OperatorKey> emptyInpTargets = new ArrayList<OperatorKey>();
         pc.getProperties().setProperty("pig.illustrating", "true");
+        String jtIdentifier = "" + System.currentTimeMillis();
+        int jobId = 0;
         while(mrp.size() != 0) {
             jc = jcc.compile(mrp, "Illustrator");
             if(jc == null) {
@@ -113,6 +117,7 @@ public class LocalMapReduceSimulator {
             }
             List<Job> jobs = jc.getWaitingJobs();
             for (Job job : jobs) {
+                jobId++;
                 jobConf = job.getJobConf();
                 FileLocalizer.setInitialized(false);
                 ArrayList<ArrayList<OperatorKey>> inpTargets =
@@ -123,14 +128,14 @@ public class LocalMapReduceSimulator {
                 PigSplit split = null;
                 List<POStore> stores = null;
                 PhysicalOperator pack = null;
-                // revisit as there are new physical operators from MR compilation 
+                // revisit as there are new physical operators from MR compilation
                 if (!mro.mapPlan.isEmpty())
                     attacher.revisit(mro.mapPlan);
                 if (!mro.reducePlan.isEmpty()) {
                     attacher.revisit(mro.reducePlan);
                     pack = mro.reducePlan.getRoots().get(0);
                 }
-                
+
                 List<POLoad> lds = PlanHelper.getPhysicalOperators(mro.mapPlan, POLoad.class);
                 if (!mro.mapPlan.isEmpty()) {
                     stores = PlanHelper.getPhysicalOperators(mro.mapPlan, POStore.class);
@@ -145,10 +150,10 @@ public class LocalMapReduceSimulator {
                 for (POStore store : stores) {
                     output.put(store.getSFile().getFileName(), attacher.getDataMap().get(store));
                 }
-               
+
                 OutputAttacher oa = new OutputAttacher(mro.mapPlan, output);
                 oa.visit();
-                
+
                 if (!mro.reducePlan.isEmpty()) {
                     oa = new OutputAttacher(mro.reducePlan, output);
                     oa.visit();
@@ -168,6 +173,7 @@ public class LocalMapReduceSimulator {
                     if (input != null)
                         mro.mapPlan.remove(ld);
                 }
+                int mapTaskId = 0;
                 for (POLoad ld : lds) {
                     // check newly generated data first
                     input = output.get(ld.getLFile().getFileName());
@@ -180,7 +186,7 @@ public class LocalMapReduceSimulator {
                                      break;
                                 }
                             }
-                        } 
+                        }
                     }
                     needFileInput = (input == null);
                     split = new PigSplit(null, index, needFileInput ? emptyInpTargets : inpTargets.get(index), 0);
@@ -199,6 +205,7 @@ public class LocalMapReduceSimulator {
                             context = ((PigMapReduceCounter.PigMapCounter) map).getIllustratorContext(jobConf, input, intermediateData, split);
                         }
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     } else {
                         if ("true".equals(jobConf.get("pig.usercomparator")))
@@ -210,10 +217,11 @@ public class LocalMapReduceSimulator {
                         Mapper<Text, Tuple, PigNullableWritable, Writable>.Context context = ((PigMapBase) map)
                           .getIllustratorContext(jobConf, input, intermediateData, split);
                         ((PigMapBase) map).setMapPlan(mro.mapPlan);
+                        context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, true, mapTaskId++).toString());
                         map.run(context);
                     }
                 }
-                
+
                 if (!mro.reducePlan.isEmpty())
                 {
                     if (pack instanceof POPackage)
@@ -233,19 +241,20 @@ public class LocalMapReduceSimulator {
                     }
 
                     ((PigMapReduce.Reduce) reduce).setReducePlan(mro.reducePlan);
+                    context.getConfiguration().set(MRConfiguration.TASK_ID, new TaskID(jtIdentifier, jobId, false, 0).toString());
                     reduce.run(context);
                 }
                 for (PhysicalOperator key : mro.phyToMRMap.keySet())
                     for (PhysicalOperator value : mro.phyToMRMap.get(key))
                         phyToMRMap.put(key, value);
             }
-            
-            
+
+
             int removedMROp = jcc.updateMROpPlan(new LinkedList<Job>());
-            
+
             numMRJobsCompl += removedMROp;
         }
-                
+
         jcc.reset();
     }
 
@@ -256,7 +265,7 @@ public class LocalMapReduceSimulator {
                     plan));
             this.outputBuffer = output;
         }
-        
+
         @Override
         public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
             if (userFunc.getFunc() != null && userFunc.getFunc() instanceof ReadScalars) {

Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/ScriptEngine.java Wed Feb 22 09:43:41 2017
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 import org.apache.hadoop.util.Shell;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStats;
 
 /**
@@ -127,7 +128,9 @@ public abstract class ScriptEngine {
     //protected static InputStream getScriptAsStream(String scriptPath) {
         InputStream is = null;
         File file = new File(scriptPath);
-        if (file.exists()) {
+        // In the frontend give preference to the local file.
+        // In the backend, try the jar first
+        if (UDFContext.getUDFContext().isFrontend() && file.exists()) {
             try {
                 is = new FileInputStream(file);
             } catch (FileNotFoundException e) {
@@ -156,7 +159,14 @@ public abstract class ScriptEngine {
                 }
             }
         }
-        
+        if (is == null && file.exists()) {
+            try {
+                is = new FileInputStream(file);
+            } catch (FileNotFoundException e) {
+                throw new IllegalStateException("could not find existing file "+scriptPath, e);
+            }
+        }
+
         // TODO: discuss if we want to add logic here to load a script from HDFS
 
         if (is == null) {

Modified: pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/js/JsFunction.java Wed Feb 22 09:43:41 2017
@@ -95,7 +95,7 @@ public class JsFunction extends EvalFunc
 
     private void debugConvertPigToJS(int depth, String pigType, Object value, Schema schema) {
         if (LOG.isDebugEnabled()) {
-            LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + value + " using " + stringify(schema));
+            LOG.debug(indent(depth)+"converting from Pig " + pigType + " " + toString(value) + " using " + stringify(schema));
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonFunction.java Wed Feb 22 09:43:41 2017
@@ -54,7 +54,7 @@ public class JythonFunction extends Eval
         try {
             f = JythonScriptEngine.getFunction(filename, functionName);
             this.function = f;
-            num_parameters = ((PyBaseCode) f.func_code).co_argcount;
+            num_parameters = ((PyBaseCode) f.__code__).co_argcount;
             PyObject outputSchemaDef = f.__findattr__("outputSchema".intern());
             if (outputSchemaDef != null) {
                 this.schema = Utils.getSchemaFromString(outputSchemaDef.toString());
@@ -105,7 +105,7 @@ public class JythonFunction extends Eval
     @Override
     public Object exec(Tuple tuple) throws IOException {
         try {
-            if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.func_code).varargs)) {
+            if (tuple == null || (num_parameters == 0 && !((PyTableCode)function.__code__).varargs)) {
                 // ignore input tuple
                 PyObject out = function.__call__();
                 return JythonUtils.pythonToPig(out);

Modified: pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/DownloadResolver.java Wed Feb 22 09:43:41 2017
@@ -44,8 +44,6 @@ public class DownloadResolver {
     private static DownloadResolver downloadResolver = new DownloadResolver();
 
     private DownloadResolver() {
-        System.setProperty("groovy.grape.report.downloads", "true");
-
         if (System.getProperty("grape.config") != null) {
             LOG.info("Using ivysettings file from " + System.getProperty("grape.config"));
         } else {

Added: pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java (added)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/ConsoleReaderInputStream.java Wed Feb 22 09:43:41 2017
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.grunt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.util.Enumeration;
+
+import jline.console.ConsoleReader;
+
+/** Borrowed from jline.console.internal.ConsoleReaderInputStream. However,
+ *  we cannot use ConsoleReaderInputStream directly since:
+ *  1. ConsoleReaderInputStream is not public
+ *  2. ConsoleReaderInputStream has a bug which does not deal with UTF-8 correctly
+ */
+public class ConsoleReaderInputStream extends SequenceInputStream {
+    private static InputStream systemIn = System.in;
+
+    public static void setIn() throws IOException {
+        setIn(new ConsoleReader());
+    }
+
+    public static void setIn(final ConsoleReader reader) {
+        System.setIn(new ConsoleReaderInputStream(reader));
+    }
+
+    /**
+     * Restore the original {@link System#in} input stream.
+     */
+    public static void restoreIn() {
+        System.setIn(systemIn);
+    }
+
+    public ConsoleReaderInputStream(final ConsoleReader reader) {
+        super(new ConsoleEnumeration(reader));
+    }
+
+    private static class ConsoleEnumeration implements Enumeration {
+        private final ConsoleReader reader;
+        private ConsoleLineInputStream next = null;
+        private ConsoleLineInputStream prev = null;
+
+        public ConsoleEnumeration(final ConsoleReader reader) {
+            this.reader = reader;
+        }
+
+        public Object nextElement() {
+            if (next != null) {
+                InputStream n = next;
+                prev = next;
+                next = null;
+
+                return n;
+            }
+
+            return new ConsoleLineInputStream(reader);
+        }
+
+        public boolean hasMoreElements() {
+            // the last line was null
+            if ((prev != null) && (prev.wasNull == true)) {
+                return false;
+            }
+
+            if (next == null) {
+                next = (ConsoleLineInputStream) nextElement();
+            }
+
+            return next != null;
+        }
+    }
+
+    private static class ConsoleLineInputStream extends InputStream {
+        private final ConsoleReader reader;
+        private byte[] buffer = null;
+        private int index = 0;
+        private boolean eol = false;
+        protected boolean wasNull = false;
+
+        public ConsoleLineInputStream(final ConsoleReader reader) {
+            this.reader = reader;
+        }
+
+        public int read() throws IOException {
+            if (eol) {
+                return -1;
+            }
+
+            if (buffer == null) {
+                buffer = reader.readLine().getBytes();
+            }
+
+            if (buffer == null) {
+                wasNull = true;
+                return -1;
+            }
+
+            if (index >= buffer.length) {
+                eol = true;
+                return '\n'; // lines are ended with a newline
+            }
+
+            return buffer[index++];
+        }
+    }
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/Grunt.java Wed Feb 22 09:43:41 2017
@@ -20,7 +20,7 @@ package org.apache.pig.tools.grunt;
 import java.io.BufferedReader;
 import java.util.ArrayList;
 
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -52,8 +52,8 @@ public class Grunt
 
     public void setConsoleReader(ConsoleReader c)
     {
-        c.addCompletor(new PigCompletorAliases(pig));
-        c.addCompletor(new PigCompletor());
+        c.addCompleter(new PigCompletorAliases(pig));
+        c.addCompleter(new PigCompletor());
         parser.setConsoleReader(c);
     }
 

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Wed Feb 22 09:43:41 2017
@@ -26,7 +26,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.Reader;
 import java.io.StringReader;
@@ -42,8 +41,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
-import jline.ConsoleReader;
-import jline.ConsoleReaderInputStream;
+import jline.console.ConsoleReader;
 
 import org.apache.commons.io.output.NullOutputStream;
 import org.apache.commons.logging.Log;
@@ -264,7 +262,7 @@ public class GruntParser extends PigScri
     public void prompt()
     {
         if (mInteractive) {
-            mConsoleReader.setDefaultPrompt("grunt> ");
+            mConsoleReader.setPrompt("grunt> ");
         }
     }
 
@@ -516,8 +514,13 @@ public class GruntParser extends PigScri
         ConsoleReader reader;
         boolean interactive;
 
-        mPigServer.getPigContext().setParams(params);
-        mPigServer.getPigContext().setParamFiles(files);
+        PigContext pc = mPigServer.getPigContext();
+
+        if( !loadOnly ) {
+          pc.getPreprocessorContext().paramScopePush();
+        }
+        pc.setParams(params);
+        pc.setParamFiles(files);
 
         try {
             FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
@@ -528,7 +531,7 @@ public class GruntParser extends PigScri
                 cmds = cmds.replaceAll("\t","    ");
 
                 reader = new ConsoleReader(new ByteArrayInputStream(cmds.getBytes()),
-                                           new OutputStreamWriter(System.out));
+                                           System.out);
                 reader.setHistory(mConsoleReader.getHistory());
                 InputStream in = new ConsoleReaderInputStream(reader);
                 inputReader = new BufferedReader(new InputStreamReader(in));
@@ -560,6 +563,9 @@ public class GruntParser extends PigScri
         if (interactive) {
             System.out.println("");
         }
+        if( ! loadOnly ) {
+          pc.getPreprocessorContext().paramScopePop();
+        }
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletor.java Wed Feb 22 09:43:41 2017
@@ -33,9 +33,9 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import jline.Completor;
+import jline.console.completer.Completer;
 
-public class PigCompletor implements Completor {
+public class PigCompletor implements Completer {
     private final Log log = LogFactory.getLog(getClass());
     Set<String> candidates;
     static final String AUTOCOMPLETE_FILENAME = "autocomplete";

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/PigCompletorAliases.java Wed Feb 22 09:43:41 2017
@@ -26,12 +26,11 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.pig.PigServer;
 
-import jline.Completor;
+import jline.console.completer.Completer;
 
-public class PigCompletorAliases implements Completor {
+public class PigCompletorAliases implements Completer {
     private final Log log = LogFactory.getLog(getClass());
     Set<String> keywords;
     PigServer pig;

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Wed Feb 22 09:43:41 2017
@@ -259,8 +259,11 @@ TOKEN :
     <PIGDEFAULT: "%default" > 
 }
 
+
 TOKEN : 
 {
+    <REGISTER: "register"> : IN_REGISTER
+    |
     <IDENTIFIER: (<SPECIALCHAR>)*<LETTER>(<DIGIT> | <LETTER> | <SPECIALCHAR>)*>
     |
     <LITERAL: ("\"" ((~["\""])*("\\\"")?)* "\"")|("'" ((~["'"])*("\\\'")?)* "'") >
@@ -276,7 +279,14 @@ TOKEN :
     <OTHER: (~["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"])+ >
     |
     <NOT_OTHER_CHAR: ["\"" , "'" , "`" , "a"-"z" , "A"-"Z" , "_" , "#" , "=" , " " , "\n" , "\t" , "\r", "%", "/", "-", "$"] >
- 
+}
+
+<IN_REGISTER> MORE : { " " |  "\t" | "\r" | "\n"}
+
+<IN_REGISTER> TOKEN: {
+  <PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+> {
+        matchedToken.image = image.toString();
+    }: DEFAULT
 }
 
 void Parse() throws IOException : {}
@@ -288,6 +298,7 @@ void input() throws IOException  :
 {
     String s;
     Token strTok = null;
+    Token strTok2 = null;
 }
 {
     strTok = <PIG>
@@ -308,6 +319,20 @@ void input() throws IOException  :
         { pc.validate(strTok.toString()); }
     )
     |
+    strTok = <REGISTER>
+    strTok2 = <PATH> {}
+    {
+        // Adding a special case for register since it handles "/*" globbing
+        // and this conflicts with general multi-line comment "/*   */".
+        // See the comment above on OTHERS on how tokenizer matches the longest
+        // match.  Here, string next to "register" is treated as PATH TOKEN
+        // and therefore not consider "/*" as part of the comment
+        // (and avoid the longest match problem).
+        out.append(strTok.image);
+        String sub_line = pc.substitute(strTok2.image);
+        out.append(sub_line);
+    }
+    |
     s = paramString(){}
     {
         //process an ordinary pig line - perform substitution