You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [7/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Thu Nov 27 12:49:54 2014
@@ -187,19 +187,18 @@ public abstract class DefaultAbstractBag
     private long totalSizeFromAvgTupleSize(long avgTupleSize, int numInMem) {
         long used = avgTupleSize * numInMem;
 
-        // add up the overhead for this object and other object variables
-        int bag_fix_size = 8 /* object header */
-        + 4 + 8 + 8 /* mLastContentsSize + mMemSize + mSize */
-        + 8 + 8 /* mContents ref  + mSpillFiles ref*/
-        + 4 /* +4 to round it to eight*/
-        + 36 /* mContents fixed */
-        ;
         long mFields_size =   roundToEight(4 + numInMem*4); /* mContents fixed + per entry */
         // in java hotspot 32bit vm, there seems to be a minimum bag size of 188 bytes
         // some of the extra bytes is probably from a minimum size of this array list
         mFields_size = Math.max(40, mFields_size);
 
-        used += bag_fix_size + mFields_size;
+        // the fixed overhead for this object and other object variables = 84 bytes
+        // 8 - object header
+        // 4 + 8 + 8 - sampled + aggSampleTupleSize + mSize
+        // 8 + 8 - mContents ref  + mSpillFiles ref
+        // 4 - spillableRegistered +4 instead of 1 to round it to eight
+        // 36 - mContents fixed
+        used += 84 + mFields_size;
 
         // add up overhead for mSpillFiles ArrayList, Object[] inside ArrayList,
         // object variable inside ArrayList and references to spill files
@@ -444,7 +443,7 @@ public abstract class DefaultAbstractBag
         if (reporter != null && reporter.getCounter(counter)!=null) {
             reporter.getCounter(counter).increment(numRecsSpilled);
         } else {
-            PigHadoopLogger.getInstance().warn(this, "Spill counter incremented", counter);
+            PigHadoopLogger.getInstance().warn(mContents, "Spill counter incremented", counter);
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultTuple.java Thu Nov 27 12:49:54 2014
@@ -45,7 +45,6 @@ import org.apache.pig.impl.util.ObjectSe
  */
 public class DefaultTuple extends AbstractTuple {
 
-    protected boolean isNull = false;
     private static final long serialVersionUID = 2L;
     protected List<Object> mFields;
 
@@ -165,11 +164,6 @@ public class DefaultTuple extends Abstra
     @Override
     public long getMemorySize() {
         Iterator<Object> i = mFields.iterator();
-        // fixed overhead
-        long empty_tuple_size = 8 /* tuple object header */
-        + 8 /* isNull - but rounded to 8 bytes as total obj size needs to be multiple of 8 */
-        + 8 /* mFields reference */
-        + 32 /* mFields array list fixed size */;
 
         // rest of the fixed portion of mfields size is accounted within empty_tuple_size
         long mfields_var_size = SizeUtil.roundToEight(4 + 4 * mFields.size());
@@ -177,7 +171,11 @@ public class DefaultTuple extends Abstra
         // which is probably from the minimum size of this array list
         mfields_var_size = Math.max(40, mfields_var_size);
 
-        long sum = empty_tuple_size + mfields_var_size;
+        // fixed overhead = 48 bytes
+        //8 - tuple object header
+        //8 - mFields reference
+        //32 - mFields array list fixed size
+        long sum = 48 + mfields_var_size;
         while (i.hasNext()) {
             sum += SizeUtil.getPigObjMemSize(i.next());
         }

Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Thu Nov 27 12:49:54 2014
@@ -516,19 +516,27 @@ public class DistinctDataBag extends Def
                     // the spill files list.  So I need to append it to my
                     // linked list as well so that it's still there when I
                     // move my linked list back to the spill files.
+                    DataOutputStream out = null;
                     try {
-                        DataOutputStream out = getSpillFile();
+                        out = getSpillFile();
                         ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                         Tuple t;
                         while ((t = readFromTree()) != null) {
                             t.write(out);
                         }
                         out.flush();
-                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
+                    } finally {
+                        if (out != null) {
+                            try {
+                                out.close();
+                            } catch (IOException e) {
+                                warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                            }
+                        }
                     }
                 }
                 // delete files that have been merged into new files

Modified: pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/InternalDistinctBag.java Thu Nov 27 12:49:54 2014
@@ -36,6 +36,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -81,7 +82,7 @@ public class InternalDistinctBag extends
         if (percent < 0) {
             percent = 0.2F;
             if (PigMapReduce.sJobConfInternal.get() != null) {
-                String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+                String usage = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
                 if (usage != null) {
                     percent = Float.parseFloat(usage);
                 }
@@ -424,19 +425,25 @@ public class InternalDistinctBag extends
                     // the spill files list.  So I need to append it to my
                     // linked list as well so that it's still there when I
                     // move my linked list back to the spill files.
+                    DataOutputStream out = null;
                     try {
-                        DataOutputStream out = getSpillFile();
+                        out = getSpillFile();
                         ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                         Tuple t;
                         while ((t = readFromTree()) != null) {
                             t.write(out);
                         }
                         out.flush();
-                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
+                    } finally {
+                        try {
+                            out.close();
+                        } catch (IOException e) {
+                            warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                        }
                     }
                 }
 

Modified: pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/InternalSortedBag.java Thu Nov 27 12:49:54 2014
@@ -35,6 +35,7 @@ import java.util.PriorityQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigWarning;
 
 
 /**
@@ -401,19 +402,27 @@ public class InternalSortedBag extends S
                     // the spill files list.  So I need to append it to my
                     // linked list as well so that it's still there when I
                     // move my linked list back to the spill files.
+                    DataOutputStream out = null;
                     try {
-                        DataOutputStream out = getSpillFile();
+                        out = getSpillFile();
                         ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                         Tuple t;
                         while ((t = readFromPriorityQ()) != null) {
                             t.write(out);
                         }
                         out.flush();
-                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
+                    } finally {
+                        if (out != null) {
+                            try {
+                                out.close();
+                            } catch (IOException e) {
+                                warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                            }
+                        }
                     }
                 }
                 // delete files that have been merged into new files

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Thu Nov 27 12:49:54 2014
@@ -17,7 +17,7 @@
  */
 package org.apache.pig.data;
 
-import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED;
 import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
 
 import java.io.File;
@@ -33,7 +33,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConstants;
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.utils.StructuresHelper.SchemaKey;
@@ -151,8 +150,8 @@ public class SchemaTupleBackend {
             return;
         }
         // Step one is to see if there are any classes in the distributed cache
-        if (!jConf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
-            LOG.info("Key [" + SHOULD_USE_SCHEMA_TUPLE +"] was not set... will not generate code.");
+        if (!jConf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+            LOG.info("Key [" + PIG_SCHEMA_TUPLE_ENABLED +"] was not set... will not generate code.");
             return;
         }
         // Step two is to copy everything from the distributed cache if we are in distributed mode
@@ -184,14 +183,22 @@ public class SchemaTupleBackend {
             LOG.info("Attempting to read file: " + s);
             // The string is the symlink into the distributed cache
             File src = new File(s);
-            FileInputStream fin = new FileInputStream(src);
-            FileOutputStream fos = new FileOutputStream(new File(codeDir, s));
-
-            fin.getChannel().transferTo(0, src.length(), fos.getChannel());
+            FileInputStream fin = null;
+            FileOutputStream fos = null;
+            try {
+                fin = new FileInputStream(src);
+                fos = new FileOutputStream(new File(codeDir, s));
 
-            fin.close();
-            fos.close();
-            LOG.info("Successfully copied file to local directory.");
+                fin.getChannel().transferTo(0, src.length(), fos.getChannel());
+                LOG.info("Successfully copied file to local directory.");
+            } finally {
+                if (fin != null) {
+                    fin.close();
+                }
+                if (fos != null) {
+                    fos.close();
+                }
+            }
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleClassGenerator.java Thu Nov 27 12:49:54 2014
@@ -61,27 +61,27 @@ public class SchemaTupleClassGenerator {
          * This context is used in UDF code. Currently, this is only used for
          * the inputs to UDF's.
          */
-        UDF (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_UDF, true, GenerateUdf.class),
+        UDF (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_UDF, true, GenerateUdf.class),
         /**
          * This context is for POForEach. This will use the expected output of a ForEach
          * to return a typed Tuple.
          */
-        FOREACH (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FOREACH, true, GenerateForeach.class),
+        FOREACH (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FOREACH, true, GenerateForeach.class),
         /**
          * This context controls whether or not SchemaTuples will be used in FR joins.
          * Currently, they will be used in the HashMap that FR Joins construct.
          */
-        FR_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_FRJOIN, true, GenerateFrJoin.class),
+        FR_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_FRJOIN, true, GenerateFrJoin.class),
         /**
          * This context controls whether or not SchemaTuples will be used in merge joins.
          */
-        MERGE_JOIN (PigConfiguration.SCHEMA_TUPLE_SHOULD_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
+        MERGE_JOIN (PigConfiguration.PIG_SCHEMA_TUPLE_USE_IN_MERGEJOIN, true, GenerateMergeJoin.class),
         /**
          * All registered Schemas will also be registered in one additional context.
          * This context will allow users to "force" the load of a SchemaTupleFactory
          * if one is present in any context.
          */
-        FORCE_LOAD (PigConfiguration.SCHEMA_TUPLE_SHOULD_ALLOW_FORCE, true, GenerateForceLoad.class);
+        FORCE_LOAD (PigConfiguration.PIG_SCHEMA_TUPLE_ALLOW_FORCE, true, GenerateForceLoad.class);
 
         /**
          * These annotations are used to mark a given SchemaTuple with
@@ -226,7 +226,7 @@ public class SchemaTupleClassGenerator {
      */
     //TODO in the future, we can use ASM to generate the bytecode directly.
     private static void compileCodeString(String className, String generatedCodeString, File codeDir) {
-        JavaCompilerHelper compiler = new JavaCompilerHelper(); 
+        JavaCompilerHelper compiler = new JavaCompilerHelper();
         String tempDir = codeDir.getAbsolutePath();
         compiler.addToClassPath(tempDir);
         LOG.debug("Compiling SchemaTuple code with classpath: " + compiler.getClassPath());
@@ -242,12 +242,14 @@ public class SchemaTupleClassGenerator {
             this.id = id;
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected int generatedCodeCompareToSpecific(SchemaTuple_"+id+" t) {");
             add("    int i = 0;");
         }
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             add("    i = compare(checkIfNull_" + fieldNum + "(), getPos_"
                     + fieldNum + "(), t.checkIfNull_" + fieldNum + "(), t.getPos_"
@@ -257,6 +259,7 @@ public class SchemaTupleClassGenerator {
             add("    }");
         }
 
+        @Override
         public void end() {
             add("    return i;");
             add("}");
@@ -271,6 +274,7 @@ public class SchemaTupleClassGenerator {
             this.id = id;
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected int generatedCodeCompareTo(SchemaTuple t, boolean checkType) {");
@@ -282,6 +286,7 @@ public class SchemaTupleClassGenerator {
         boolean compIsNull = false;
         boolean compByte = false;
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             add("        i = compareWithElementAtPos(checkIfNull_" + fieldNum + "(), getPos_" + fieldNum + "(), t, " + fieldNum + ");");
             add("        if (i != 0) {");
@@ -289,6 +294,7 @@ public class SchemaTupleClassGenerator {
             add("        }");
         }
 
+        @Override
         public void end() {
             add("    return 0;");
             add("}");
@@ -296,16 +302,19 @@ public class SchemaTupleClassGenerator {
     }
 
     static class HashCode extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public int generatedCodeHashCode() {");
             add("    int h = 17;");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    h = hashCodePiece(h, getPos_" + fieldPos + "(), checkIfNull_" + fieldPos + "());");
         }
 
+        @Override
         public void end() {
             add("    return h;");
             add("}");
@@ -323,6 +332,7 @@ public class SchemaTupleClassGenerator {
         private int booleans = 0;
         private File codeDir;
 
+        @Override
         public void prepare() {
             String s;
             try {
@@ -333,6 +343,7 @@ public class SchemaTupleClassGenerator {
             add("private static Schema schema = staticSchemaGen(\"" + s + "\");");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (!isTuple()) {
                 if (isPrimitive() && (primitives++ % 8 == 0)) {
@@ -385,6 +396,7 @@ public class SchemaTupleClassGenerator {
         private int byteField = 0; //this is for setting booleans
         private int byteIncr = 0; //this is for counting the booleans we've encountered
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (!isTuple()) {
                 add("public void setPos_"+fieldPos+"("+typeName()+" v) {");
@@ -433,27 +445,32 @@ public class SchemaTupleClassGenerator {
     }
 
     static class ListSetString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public void generatedCodeSetIterator(Iterator<Object> it) throws ExecException {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    setPos_"+fieldPos+"(unbox(it.next(), getDummy_"+fieldPos+"()));");
         }
 
+        @Override
         public void end() {
             add("}");
         }
     }
 
     static class GenericSetString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public void generatedCodeSetField(int fieldNum, Object val) throws ExecException {");
             add("    switch (fieldNum) {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    case ("+fieldPos+"):");
             add("        if (val == null) {");
@@ -464,6 +481,7 @@ public class SchemaTupleClassGenerator {
             add("        break;");
         }
 
+        @Override
         public void end() {
             add("    default:");
             add("        throw new ExecException(\"Invalid index given to set: \" + fieldNum);");
@@ -473,16 +491,19 @@ public class SchemaTupleClassGenerator {
     }
 
     static class GenericGetString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public Object generatedCodeGetField(int fieldNum) throws ExecException {");
             add("    switch (fieldNum) {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    case ("+fieldPos+"): return checkIfNull_"+fieldPos+"() ? null : box(getPos_"+fieldPos+"());");
         }
 
+        @Override
         public void end() {
             add("    default: throw new ExecException(\"Invalid index given to get: \" + fieldNum);");
             add("    }");
@@ -491,16 +512,19 @@ public class SchemaTupleClassGenerator {
     }
 
     static class GeneralIsNullString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public boolean isGeneratedCodeFieldNull(int fieldNum) throws ExecException {");
             add("    switch (fieldNum) {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    case ("+fieldPos+"): return checkIfNull_"+fieldPos+"();");
         }
 
+        @Override
         public void end() {
             add("    default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
             add("    }");
@@ -512,6 +536,7 @@ public class SchemaTupleClassGenerator {
         private int nullByte = 0; //the byte_ val
         private int byteIncr = 0; //the mask we're on
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("public boolean checkIfNull_" + fieldPos + "() {");
             if (isPrimitive()) {
@@ -532,6 +557,7 @@ public class SchemaTupleClassGenerator {
         private int nullByte = 0; //the byte_ val
         private int byteIncr = 0; //the mask we're on
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("public void setNull_"+fieldPos+"(boolean b) {");
             if (isPrimitive()) {
@@ -554,11 +580,13 @@ public class SchemaTupleClassGenerator {
     static class SetEqualToSchemaTupleSpecificString extends TypeInFunctionStringOut {
         private int id;
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected SchemaTuple generatedCodeSetSpecific(SchemaTuple_"+id+" t) {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("    if (t.checkIfNull_" + fieldPos + "()) {");
             add("        setNull_" + fieldPos + "(true);");
@@ -568,6 +596,7 @@ public class SchemaTupleClassGenerator {
             addBreak();
         }
 
+        @Override
         public void end() {
             add("    return this;");
             add("}");
@@ -586,6 +615,7 @@ public class SchemaTupleClassGenerator {
             this.id = id;
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("public boolean isSpecificSchemaTuple(Object o) {");
@@ -599,15 +629,18 @@ public class SchemaTupleClassGenerator {
     static class WriteNullsString extends TypeInFunctionStringOut {
         String s = "    boolean[] b = {\n";
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected boolean[] generatedCodeNullsArray() throws IOException {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             s += "        checkIfNull_"+fieldPos+"(),\n";
         }
 
+        @Override
         public void end() {
             s = s.substring(0, s.length() - 2) + "\n    };";
             add(s);
@@ -626,11 +659,13 @@ public class SchemaTupleClassGenerator {
 
         private int booleans = 0;
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected void generatedCodeReadFields(DataInput in, boolean[] b) throws IOException {");
         }
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (isBoolean()) {
                 booleans++;
@@ -659,6 +694,7 @@ public class SchemaTupleClassGenerator {
             }
         }
 
+        @Override
         public void end() {
             if (booleans > 0) {
                 int i = 0;
@@ -679,6 +715,7 @@ public class SchemaTupleClassGenerator {
 
 
     static class WriteString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("protected void generatedCodeWriteElements(DataOutput out) throws IOException {");
@@ -686,6 +723,7 @@ public class SchemaTupleClassGenerator {
 
         private int booleans = 0;
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (isBoolean()) {
                 booleans++;
@@ -697,6 +735,7 @@ public class SchemaTupleClassGenerator {
             }
         }
 
+        @Override
         public void end() {
             if (booleans > 0) {
                 int i = 0;
@@ -716,6 +755,7 @@ public class SchemaTupleClassGenerator {
 
         String s = "    return SizeUtil.roundToEight(";
 
+        @Override
         public void prepare() {
             add("@Override");
             add("public long getGeneratedCodeMemorySize() {");
@@ -725,6 +765,7 @@ public class SchemaTupleClassGenerator {
         private int primitives = 0;
 
         //TODO a null array or object variable still takes up space for the pointer, yes?
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (isInt() || isFloat()) {
                 size += 4;
@@ -757,6 +798,7 @@ public class SchemaTupleClassGenerator {
             }
         }
 
+        @Override
         public void end() {
             s += size + ");";
             add(s);
@@ -766,6 +808,7 @@ public class SchemaTupleClassGenerator {
     }
 
     static class GetDummyString extends TypeInFunctionStringOut {
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             add("public "+typeName()+" getDummy_"+fieldPos+"() {");
             switch (fs.type) {
@@ -795,6 +838,7 @@ public class SchemaTupleClassGenerator {
         private int booleanByte = 0;
         private int booleans;
 
+        @Override
         public void process(int fieldPos, Schema.FieldSchema fs) {
             if (!isTuple()) {
                 add("public "+typeName()+" getPos_"+fieldPos+"() {");
@@ -823,6 +867,7 @@ public class SchemaTupleClassGenerator {
     static class GetSchemaTupleIdentifierString extends TypeInFunctionStringOut {
         private int id;
 
+        @Override
         public void end() {
             add("@Override");
             add("public int getSchemaTupleIdentifier() {");
@@ -839,10 +884,12 @@ public class SchemaTupleClassGenerator {
     static class SchemaSizeString extends TypeInFunctionStringOut {
         int i = 0;
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fS) {
             i++;
         }
 
+        @Override
         public void end() {
             add("@Override");
             add("protected int schemaSize() {");
@@ -855,10 +902,12 @@ public class SchemaTupleClassGenerator {
     static class SizeString extends TypeInFunctionStringOut {
         int i = 0;
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fS) {
             i++;
         }
 
+        @Override
         public void end() {
             add("@Override");
             add("protected int generatedCodeSize() {");
@@ -873,16 +922,19 @@ public class SchemaTupleClassGenerator {
     }
 
     static class GetTypeString extends TypeInFunctionStringOut {
+        @Override
         public void prepare() {
             add("@Override");
             add("public byte getGeneratedCodeFieldType(int fieldNum) throws ExecException {");
             add("    switch (fieldNum) {");
         }
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             add("    case ("+fieldNum+"): return "+fs.type+";");
         }
 
+        @Override
         public void end() {
             add("    default: throw new ExecException(\"Invalid index given: \" + fieldNum);");
             add("    }");
@@ -898,6 +950,7 @@ public class SchemaTupleClassGenerator {
             this.id = id;
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected SchemaTuple generatedCodeSet(SchemaTuple t, boolean checkClass) throws ExecException {");
@@ -913,6 +966,7 @@ public class SchemaTupleClassGenerator {
             addBreak();
         }
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             add("    if ("+fs.type+" != theirFS.get("+fieldNum+").type) {");
             add("        throw new ExecException(\"Given SchemaTuple does not match current in field " + fieldNum + ". Expected type: " + fs.type + ", found: \" + theirFS.get("+fieldNum+").type);");
@@ -929,6 +983,7 @@ public class SchemaTupleClassGenerator {
             addBreak();
         }
 
+        @Override
         public void end() {
             add("    return this;");
             add("}");
@@ -940,18 +995,21 @@ public class SchemaTupleClassGenerator {
             super(type);
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected "+name()+" generatedCodeGet"+properName()+"(int fieldNum) throws ExecException {");
             add("    switch(fieldNum) {");
         }
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             if (fs.type==thisType()) {
                 add("    case ("+fieldNum+"): return returnUnlessNull(checkIfNull_"+fieldNum+"(), getPos_"+fieldNum+"());");
             }
         }
 
+        @Override
         public void end() {
             add("    default:");
             add("        return unbox"+properName()+"(getTypeAwareBase(fieldNum, \""+name()+"\"));");
@@ -979,17 +1037,20 @@ public class SchemaTupleClassGenerator {
             return proper(thisType());
         }
 
+        @Override
         public void prepare() {
             add("@Override");
             add("protected void generatedCodeSet"+properName()+"(int fieldNum, "+name()+" val) throws ExecException {");
             add("    switch(fieldNum) {");
         }
 
+        @Override
         public void process(int fieldNum, Schema.FieldSchema fs) {
             if (fs.type==thisType())
                 add("    case ("+fieldNum+"): setPos_"+fieldNum+"(val); break;");
         }
 
+        @Override
         public void end() {
             add("    default: setTypeAwareBase(fieldNum, val, \""+name()+"\");");
             add("    }");

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleFrontend.java Thu Nov 27 12:49:54 2014
@@ -17,7 +17,7 @@
  */
 package org.apache.pig.data;
 
-import static org.apache.pig.PigConfiguration.SHOULD_USE_SCHEMA_TUPLE;
+import static org.apache.pig.PigConfiguration.PIG_SCHEMA_TUPLE_ENABLED;
 import static org.apache.pig.PigConstants.GENERATED_CLASSES_KEY;
 import static org.apache.pig.PigConstants.LOCAL_CODE_DIR;
 import static org.apache.pig.PigConstants.SCHEMA_TUPLE_ON_BY_DEFAULT;
@@ -177,8 +177,8 @@ public class SchemaTupleFrontend {
          */
         private boolean generateAll(Map<Pair<SchemaKey, Boolean>, Pair<Integer, Set<GenContext>>> schemasToGenerate) {
             boolean filesToShip = false;
-            if (!conf.getBoolean(SHOULD_USE_SCHEMA_TUPLE, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
-                LOG.info("Key ["+SHOULD_USE_SCHEMA_TUPLE+"] is false, will not generate code.");
+            if (!conf.getBoolean(PIG_SCHEMA_TUPLE_ENABLED, SCHEMA_TUPLE_ON_BY_DEFAULT)) {
+                LOG.info("Key ["+PIG_SCHEMA_TUPLE_ENABLED+"] is false, will not generate code.");
                 return false;
             }
             LOG.info("Generating all registered Schemas.");

Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Thu Nov 27 12:49:54 2014
@@ -63,7 +63,7 @@ public abstract class SelfSpillBag exten
             maxMem = Runtime.getRuntime().maxMemory();
             if (PigMapReduce.sJobConfInternal.get() != null) {
                 String usage = PigMapReduce.sJobConfInternal.get().get(
-                        PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+                        PigConfiguration.PIG_CACHEDBAG_MEMUSAGE);
                 if (usage != null) {
                     cachedMemUsage = Float.parseFloat(usage);
                 }

Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Thu Nov 27 12:49:54 2014
@@ -491,19 +491,25 @@ public class SortedDataBag extends Defau
                     // the spill files list.  So I need to append it to my
                     // linked list as well so that it's still there when I
                     // move my linked list back to the spill files.
+                    DataOutputStream out = null;
                     try {
-                        DataOutputStream out = getSpillFile();
+                        out = getSpillFile();
                         ll.add(mSpillFiles.get(mSpillFiles.size() - 1));
                         Tuple t;
                         while ((t = readFromPriorityQ()) != null) {
                             t.write(out);
                         }
                         out.flush();
-                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
+                    } finally {
+                        try {
+                            out.close();
+                        } catch (IOException e) {
+                            warn("Error closing spill", PigWarning.UNABLE_TO_CLOSE_SPILL_FILE, e);
+                        }
                     }
                 }
                 // delete files that have been merged into new files

Modified: pig/branches/spark/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigContext.java Thu Nov 27 12:49:54 2014
@@ -48,13 +48,13 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Level;
 import org.apache.pig.ExecType;
 import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.Main;
+import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
@@ -65,7 +65,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
 import org.apache.pig.tools.parameters.PreprocessorContext;
@@ -105,7 +104,9 @@ public class PigContext implements Seria
      * Resources for the job (jars, scripting udf files, cached macro abstract syntax trees)
      */
 
-    // extra jar files that are needed to run a job
+    // Jar files that are global to the whole Pig script, includes
+    // 1. registered jars
+    // 2. Jars defined in -Dpig.additional.jars
     transient public List<URL> extraJars = new LinkedList<URL>();
 
     // original paths each extra jar came from
@@ -115,10 +116,6 @@ public class PigContext implements Seria
     // jars needed for scripting udfs - jython.jar etc
     transient public List<String> scriptJars = new ArrayList<String>(2);
 
-    // jars that should not be merged in.
-    // (some functions may come from pig.jar and we don't want the whole jar file.)
-    transient public Vector<String> skipJars = new Vector<String>(2);
-
     // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
     transient public Vector<String> predeployedJars = new Vector<String>(2);
 
@@ -174,6 +171,15 @@ public class PigContext implements Seria
     // List of paths skipped for automatic shipping
     List<String> skippedShipPaths = new ArrayList<String>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        packageImportList.set(null);
+    }
+
     /**
      * extends URLClassLoader to allow adding to classpath as new jars
      * are registered.
@@ -260,13 +266,6 @@ public class PigContext implements Seria
         this.properties = properties;
 
         this.properties.setProperty("exectype", this.execType.name());
-        String pigJar = JarManager.findContainingJar(Main.class);
-        String hadoopJar = JarManager.findContainingJar(FileSystem.class);
-        if (pigJar != null) {
-            addSkipJar(pigJar);
-            if (!pigJar.equals(hadoopJar))
-                addSkipJar(hadoopJar);
-        }
 
         this.executionEngine = execType.getExecutionEngine(this);
 
@@ -345,12 +344,6 @@ public class PigContext implements Seria
         }
     }
 
-    public void addSkipJar(String path) {
-        if (path != null && !skipJars.contains(path)) {
-            skipJars.add(path);
-        }
-    }
-
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();
@@ -409,6 +402,7 @@ public class PigContext implements Seria
 
     public String doParamSubstitution(BufferedReader reader) throws IOException {
         try {
+            preprocessorContext.setPigContext(this);
             preprocessorContext.loadParamVal(params, paramFiles);
             ParameterSubstitutionPreprocessor psp
                 = new ParameterSubstitutionPreprocessor(preprocessorContext);

Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Thu Nov 27 12:49:54 2014
@@ -43,7 +43,24 @@ public class PigImplConstants {
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
 
     /**
+     * Used by pig to indicate that current job has been converted to run in fetch mode
+     */
+    public static final String CONVERTED_TO_FETCH = "pig.job.converted.fetch";
+
+    /**
      * Indicate the split index of the task. Used by merge cogroup
      */
     public static final String PIG_SPLIT_INDEX = "pig.split.index";
+
+    /**
+     * Parallelism for the reducer
+     */
+    public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
+    public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
+    public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
+
+    /**
+     * Parallelism to be used for CROSS operation by GFCross UDF
+     */
+    public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Nov 27 12:49:54 2014
@@ -173,7 +173,7 @@ public class FindQuantiles extends EvalF
             }
             long numSamples = samples.size();
             double toSkip = (double)numSamples / numQuantiles;
-            if(toSkip == 0) {
+            if(toSkip < 1) {
                 // numSamples is < numQuantiles;
                 // set numQuantiles to numSamples
                 numQuantiles = (int)numSamples;

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Thu Nov 27 12:49:54 2014
@@ -22,13 +22,12 @@ import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.UDFContext;
 
 
@@ -56,16 +55,19 @@ public class GFCross extends EvalFunc<Da
             parallelism = DEFAULT_PARALLELISM;
             Configuration cfg = UDFContext.getUDFContext().getJobConf();
             if (cfg != null) {
-                String s = cfg.get(PigConfiguration.PIG_CROSS_PARALLELISM_HINT + "." + crossKey);
+                String s = cfg.get(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey);
                 if (s == null) {
                     throw new IOException("Unable to get parallelism hint from job conf");
                 }
                 parallelism = Integer.valueOf(s);
+                if (parallelism < 0) {
+                    throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
+                }
             }
 
             numInputs = (Integer)input.get(0);
             myNumber = (Integer)input.get(1);
-        
+
             numGroupsPerInput = (int) Math.ceil(Math.pow(parallelism, 1.0/numInputs));
             numGroupsGoingTo = (int) Math.pow(numGroupsPerInput,numInputs - 1);
         }
@@ -73,21 +75,21 @@ public class GFCross extends EvalFunc<Da
         DataBag output = mBagFactory.newDefaultBag();
 
         try{
-               
+
             int[] digits = new int[numInputs];
             digits[myNumber] = r.nextInt(numGroupsPerInput);
 
             for (int i=0; i<numGroupsGoingTo; i++){
                 output.add(toTuple(digits));
                 next(digits);
-            }            
-    
+            }
+
             return output;
         }catch(ExecException e){
             throw e;
         }
     }
-    
+
     private Tuple toTuple(int[] digits) throws IOException, ExecException{
         Tuple t = mTupleFactory.newTuple(numInputs);
         for (int i=0; i<numInputs; i++){
@@ -95,7 +97,7 @@ public class GFCross extends EvalFunc<Da
         }
         return t;
     }
-    
+
     private void next(int[] digits){
         for (int i=0; i<numInputs; i++){
             if (i== myNumber)

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Thu Nov 27 12:49:54 2014
@@ -172,8 +172,8 @@ public class PoissonSampleLoader extends
         newSample = null;
 
         Configuration conf = split.getConf();
-        sampleRate = conf.getInt(PigConfiguration.SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
-        heapPerc = conf.getFloat(PigConfiguration.PERC_MEM_AVAIL,
+        sampleRate = conf.getInt(PigConfiguration.PIG_POISSON_SAMPLER_SAMPLE_RATE, DEFAULT_SAMPLE_RATE);
+        heapPerc = conf.getFloat(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE,
                 PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE);
     }
 

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/StreamingUDF.java Thu Nov 27 12:49:54 2014
@@ -206,7 +206,7 @@ public class StreamingUDF extends EvalFu
                 filePath.substring(0, lastSeparator - 1);
         command[UDF_NAME] = funcName;
         String fileCachePath = jobDir + filePath.substring(0, lastSeparator);
-        command[PATH_TO_FILE_CACHE] = "\"" + fileCachePath + "\"";
+        command[PATH_TO_FILE_CACHE] = "'" + fileCachePath + "'";
         command[STD_OUT_OUTPUT_PATH] = outFileName;
         command[STD_ERR_OUTPUT_PATH] = errOutFileName;
         command[CONTROLLER_LOG_FILE_PATH] = controllerLogFileName;
@@ -227,7 +227,8 @@ public class StreamingUDF extends EvalFu
 
         File userUdfFile = new File(fileCachePath + command[UDF_FILE_NAME] + getUserFileExtension());
         if (!userUdfFile.exists()) {
-            String absolutePath = filePath.startsWith("/") ? filePath : File.separator + filePath;
+            String absolutePath = filePath.startsWith("/") ? filePath : "/" + filePath;
+            absolutePath = absolutePath.replaceAll(":", "");
             String controllerDir = new File(command[PATH_TO_CONTROLLER_FILE]).getParent();
             String userUdfPath = controllerDir + absolutePath + getUserFileExtension();
             userUdfFile = new File(userUdfPath);

Modified: pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/FileLocalizer.java Thu Nov 27 12:49:54 2014
@@ -478,15 +478,15 @@ public class FileLocalizer {
      * since resourthPath should be available in the entire session
      *
      * @param pigContext
-     * @return
+     * @return temporary resource path
      * @throws DataStorageException
      */
-    public static synchronized ContainerDescriptor getTemporaryResourcePath(final PigContext pigContext)
+    public static synchronized Path getTemporaryResourcePath(final PigContext pigContext)
             throws DataStorageException {
         if (resourcePath == null) {
             resourcePath = getTempContainer(pigContext);
         }
-        return resourcePath;
+        return ((HPath)resourcePath).getPath();
     }
 
     private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext)
@@ -787,6 +787,9 @@ public class FileLocalizer {
                                             boolean multipleFiles) throws IOException {
 
         Path path = new Path(filePath);
+        if (path.getName().isEmpty()) {
+            return new FetchFileRet[0];
+        }
         URI uri = path.toUri();
         Configuration conf = new Configuration();
         ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(properties));
@@ -800,7 +803,7 @@ public class FileLocalizer {
                 && uri.getScheme() == null )||
                 // For Windows local files
                 (uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
-                (uri.getScheme() != null && uri.getScheme().equals("local")) 
+                (uri.getScheme() != null && uri.getScheme().equals("local"))
             ) {
             srcFs = localFs;
         } else {
@@ -859,14 +862,20 @@ public class FileLocalizer {
         dest.getParentFile().mkdirs();
         dest.deleteOnExit();
 
-        OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest));
-        byte[] buffer = new byte[1024];
-        int len;
-        while ((len=resourceStream.read(buffer)) > 0) {
-          outputStream.write(buffer,0,len);
+        OutputStream outputStream = null;
+        try {
+            outputStream = new BufferedOutputStream(new FileOutputStream(dest));
+            byte[] buffer = new byte[1024];
+            int len;
+            while ((len=resourceStream.read(buffer)) > 0) {
+              outputStream.write(buffer,0,len);
+            }
+        } finally {
+            resourceStream.close();
+            if (outputStream != null) {
+                outputStream.close();
+            }
         }
-        outputStream.close();
-
         localFileRet = new FetchFileRet(dest,false);
       }
       else

Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Nov 27 12:49:54 2014
@@ -213,6 +213,9 @@ public class ReadToEndLoader extends Loa
         // input completely
         PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
                 new ArrayList<OperatorKey>(), -1);
+        // Set the conf object so that if the wrappedLoadFunc uses it,
+        // it won't be null
+        pigSplit.setConf(conf);
         wrappedLoadFunc.prepareToRead(reader, pigSplit);
         return true;
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Thu Nov 27 12:49:54 2014
@@ -47,10 +47,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.BZip2Constants;
-import org.codehaus.jackson.annotate.JsonPropertyOrder;
-import org.codehaus.jackson.map.annotate.JacksonStdImpl;
 import org.joda.time.DateTime;
 
 import com.google.common.collect.Multimaps;
@@ -68,8 +67,6 @@ public class JarManager {
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
         GUAVA(Multimaps.class),
-        JACKSON_CORE(JsonPropertyOrder.class),
-        JACKSON_MAPPER(JacksonStdImpl.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;
@@ -92,9 +89,16 @@ public class JarManager {
         createPigScriptUDFJar(fos, pigContext, contents);
 
         if (!contents.isEmpty()) {
-            FileInputStream fis = new FileInputStream(scriptUDFJarFile);
-            String md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
-            fis.close();
+            FileInputStream fis = null;
+            String md5 = null;
+            try {
+                fis = new FileInputStream(scriptUDFJarFile);
+                md5 = org.apache.commons.codec.digest.DigestUtils.md5Hex(fis);
+            } finally {
+                if (fis != null) {
+                    fis.close();
+                }
+            }
             File newScriptUDFJarFile = new File(scriptUDFJarFile.getParent(), "PigScriptUDF-" + md5 + ".jar");
             scriptUDFJarFile.renameTo(newScriptUDFJarFile);
             return newScriptUDFJarFile;
@@ -107,15 +111,20 @@ public class JarManager {
         for (String path: pigContext.scriptFiles) {
             log.debug("Adding entry " + path + " to job jar" );
             InputStream stream = null;
-            if (new File(path).exists()) {
-                stream = new FileInputStream(new File(path));
+            File inputFile = new File(path);
+            if (inputFile.exists()) {
+                stream = new FileInputStream(inputFile);
             } else {
                 stream = PigContext.getClassLoader().getResourceAsStream(path);
             }
             if (stream==null) {
                 throw new IOException("Cannot find " + path);
             }
-            addStream(jarOutputStream, path, stream, contents);
+            try {
+                addStream(jarOutputStream, path, stream, contents, inputFile.lastModified());
+            } finally {
+                stream.close();
+            }
         }
         for (Map.Entry<String, File> entry : pigContext.getScriptFiles().entrySet()) {
             log.debug("Adding entry " + entry.getKey() + " to job jar" );
@@ -128,7 +137,11 @@ public class JarManager {
             if (stream==null) {
                 throw new IOException("Cannot find " + entry.getValue().getPath());
             }
-            addStream(jarOutputStream, entry.getKey(), stream, contents);
+            try {
+                addStream(jarOutputStream, entry.getKey(), stream, contents, entry.getValue().lastModified());
+            } finally {
+                stream.close();
+            }
         }
         if (!contents.isEmpty()) {
             jarOutputStream.close();
@@ -139,7 +152,7 @@ public class JarManager {
 
     /**
      * Creates a Classloader based on the passed jarFile and any extra jar files.
-     * 
+     *
      * @param jarFile
      *            the jar file to be part of the newly created Classloader. This jar file plus any
      *            jars in the extraJars list will constitute the classpath.
@@ -161,7 +174,7 @@ public class JarManager {
 
      /**
      * Adds a stream to a Jar file.
-     * 
+     *
      * @param os
      *            the OutputStream of the Jar file to which the stream will be added.
      * @param name
@@ -171,15 +184,20 @@ public class JarManager {
      * @param contents
      *            the current contents of the Jar file. (We use this to avoid adding two streams
      *            with the same name.
+     * @param timestamp
+     *            timestamp of the entry
      * @throws IOException
      */
-    private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents)
+    private static void addStream(JarOutputStream os, String name, InputStream is, Map<String, String> contents,
+            long timestamp)
             throws IOException {
         if (contents.get(name) != null) {
             return;
         }
         contents.put(name, "");
-        os.putNextEntry(new JarEntry(name));
+        JarEntry entry = new JarEntry(name);
+        entry.setTime(timestamp);
+        os.putNextEntry(entry);
         byte buffer[] = new byte[4096];
         int rc;
         while ((rc = is.read(buffer)) > 0) {
@@ -190,6 +208,9 @@ public class JarManager {
     public static List<String> getDefaultJars() {
         List<String> defaultJars = new ArrayList<String>();
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
+            if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
+                continue; //Skip
+            }
             String jar = findContainingJar(pkgToSend.getPkgClass());
             if (!defaultJars.contains(jar)) {
                 defaultJars.add(jar);
@@ -201,7 +222,7 @@ public class JarManager {
     /**
      * Find a jar that contains a class of the same name, if any. It will return a jar file, even if
      * that is not the first thing on the class path that has a class with the same name.
-     * 
+     *
      * @param my_class
      *            the class to find
      * @return a jar file that contains the class, or null
@@ -243,12 +264,12 @@ public class JarManager {
         }
         return null;
     }
-    
+
     /**
      * Add the jars containing the given classes to the job's configuration
      * such that JobClient will ship them to the cluster and add them to
      * the DistributedCache
-     * 
+     *
      * @param job
      *           Job object
      * @param classes
@@ -266,10 +287,10 @@ public class JarManager {
             return;
         conf.set("tmpjars", StringUtils.arrayToString(jars.toArray(new String[0])));
     }
-    
+
     /**
-     * Add the qualified path name of jars containing the given classes 
-     * 
+     * Add the qualified path name of jars containing the given classes
+     *
      * @param fs
      *            FileSystem object
      * @param jars

Modified: pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/PropertiesUtil.java Thu Nov 27 12:49:54 2014
@@ -145,9 +145,9 @@ public class PropertiesUtil {
             properties.setProperty("stop.on.failure", ""+false);
         }
 
-        if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+        if (properties.getProperty(PigConfiguration.PIG_OPT_FETCH) == null) {
             //by default fetch optimization is on
-            properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+            properties.setProperty(PigConfiguration.PIG_OPT_FETCH, ""+true);
         }
     }
     

Modified: pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/SpillableMemoryManager.java Thu Nov 27 12:49:54 2014
@@ -43,38 +43,38 @@ import org.apache.commons.logging.LogFac
  * <p>
  * Low memory is defined as more than 50% of the tenured pool being allocated. Spillable objects are
  * tracked using WeakReferences so that the objects can be GCed even though this class has a reference
- * to them. 
+ * to them.
  *
  */
 public class SpillableMemoryManager implements NotificationListener {
-    
+
     private final Log log = LogFactory.getLog(getClass());
-    
+
     LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
-    
-    // if we freed at least this much, invoke GC 
+
+    // if we freed at least this much, invoke GC
     // (default 40 MB - this can be overridden by user supplied property)
     private static long gcActivationSize = 40000000L ;
-    
+
     // spill file size should be at least this much
     // (default 5MB - this can be overridden by user supplied property)
     private static long spillFileSizeThreshold = 5000000L ;
-    
+
     // this will keep track of memory freed across spills
     // and between GC invocations
     private static long accumulatedFreeSize = 0L;
-    
+
     // fraction of biggest heap for which we want to get
     // "memory usage threshold exceeded" notifications
     private static double memoryThresholdFraction = 0.7;
-    
+
     // fraction of biggest heap for which we want to get
     // "collection threshold exceeded" notifications
     private static double collectionMemoryThresholdFraction = 0.5;
-        
+
     // log notification on usage threshold exceeded only the first time
     private boolean firstUsageThreshExceededLogged = false;
-    
+
     // log notification on collection threshold exceeded only the first time
     private boolean firstCollectionThreshExceededLogged = false;
 
@@ -82,54 +82,52 @@ public class SpillableMemoryManager impl
     // if we want to perform an extra gc before the spill
     private static double extraGCThresholdFraction = 0.05;
     private static long extraGCSpillSizeThreshold  = 0L;
-    
+
     private static volatile SpillableMemoryManager manager;
 
     private SpillableMemoryManager() {
         ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
         List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
-        MemoryPoolMXBean biggestHeap = null;
-        long biggestSize = 0;
+        MemoryPoolMXBean tenuredHeap = null;
+        long tenuredHeapSize = 0;
         long totalSize = 0;
-        for (MemoryPoolMXBean b: mpbeans) {
-            log.debug("Found heap (" + b.getName() +
-                ") of type " + b.getType());
-            if (b.getType() == MemoryType.HEAP) {
-                /* Here we are making the leap of faith that the biggest
-                 * heap is the tenured heap
-                 */
-                long size = b.getUsage().getMax();
+        for (MemoryPoolMXBean pool : mpbeans) {
+            log.debug("Found heap (" + pool.getName() + ") of type " + pool.getType());
+            if (pool.getType() == MemoryType.HEAP) {
+                long size = pool.getUsage().getMax();
                 totalSize += size;
-                if (size > biggestSize) {
-                    biggestSize = size;
-                    biggestHeap = b;
+                // CMS Old Gen or "tenured" is the only heap that supports
+                // setting usage threshold.
+                if (pool.isUsageThresholdSupported()) {
+                    tenuredHeapSize = size;
+                    tenuredHeap = pool;
                 }
             }
         }
         extraGCSpillSizeThreshold  = (long) (totalSize * extraGCThresholdFraction);
-        if (biggestHeap == null) {
+        if (tenuredHeap == null) {
             throw new RuntimeException("Couldn't find heap");
         }
         log.debug("Selected heap to monitor (" +
-            biggestHeap.getName() + ")");
-        
-        // we want to set both collection and usage threshold alerts to be 
+            tenuredHeap.getName() + ")");
+
+        // we want to set both collection and usage threshold alerts to be
         // safe. In some local tests after a point only collection threshold
         // notifications were being sent though usage threshold notifications
         // were sent early on. So using both would ensure that
         // 1) we get notified early (though usage threshold exceeded notifications)
         // 2) we get notified always when threshold is exceeded (either usage or
         //    collection)
-        
+
         /* We set the threshold to be 50% of tenured since that is where
          * the GC starts to dominate CPU time according to Sun doc */
-        biggestHeap.setCollectionUsageThreshold((long)(biggestSize * collectionMemoryThresholdFraction));
+        tenuredHeap.setCollectionUsageThreshold((long)(tenuredHeapSize * collectionMemoryThresholdFraction));
         // we set a higher threshold for usage threshold exceeded notification
         // since this is more likely to be effective sooner and we do not
         // want to be spilling too soon
-        biggestHeap.setUsageThreshold((long)(biggestSize * memoryThresholdFraction));
+        tenuredHeap.setUsageThreshold((long)(tenuredHeapSize * memoryThresholdFraction));
     }
-    
+
     public static SpillableMemoryManager getInstance() {
         if (manager == null) {
             manager = new SpillableMemoryManager();
@@ -138,21 +136,21 @@ public class SpillableMemoryManager impl
     }
 
     public static void configure(Properties properties) {
-        
+
         try {
-            
+
             spillFileSizeThreshold = Long.parseLong(
                     properties.getProperty("pig.spill.size.threshold") ) ;
-            
+
             gcActivationSize = Long.parseLong(
                     properties.getProperty("pig.spill.gc.activation.size") ) ;
-        } 
+        }
         catch (NumberFormatException  nfe) {
             throw new RuntimeException("Error while converting system configurations" +
             		"spill.size.threshold, spill.gc.activation.size", nfe) ;
         }
     }
-    
+
     @Override
     public void handleNotification(Notification n, Object o) {
         CompositeData cd = (CompositeData) n.getUserData();
@@ -166,7 +164,7 @@ public class SpillableMemoryManager impl
             toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5);
 
             //log
-            String msg = "memory handler call- Usage threshold " 
+            String msg = "memory handler call- Usage threshold "
                 + info.getUsage();
             if(!firstUsageThreshExceededLogged){
                 log.info("first " + msg);
@@ -177,7 +175,7 @@ public class SpillableMemoryManager impl
         } else { // MEMORY_COLLECTION_THRESHOLD_EXCEEDED CASE
             long threshold = (long)(info.getUsage().getMax() * collectionMemoryThresholdFraction);
             toFree = info.getUsage().getUsed() - threshold + (long)(threshold * 0.5);
-            
+
             //log
             String msg = "memory handler call - Collection threshold "
                 + info.getUsage();
@@ -191,7 +189,7 @@ public class SpillableMemoryManager impl
         }
         clearSpillables();
         if (toFree < 0) {
-            log.debug("low memory handler returning " + 
+            log.debug("low memory handler returning " +
                 "because there is nothing to free");
             return;
         }
@@ -203,7 +201,7 @@ public class SpillableMemoryManager impl
                  * becomes null, but it will be close enough.
                  * Also between the time we sort and we use these spillables, they
                  * may actually change in size - so this is just best effort
-                 */    
+                 */
                 @Override
                 public int compare(WeakReference<Spillable> o1Ref, WeakReference<Spillable> o2Ref) {
                     Spillable o1 = o1Ref.get();
@@ -219,7 +217,7 @@ public class SpillableMemoryManager impl
                     }
                     long o1Size = o1.getMemorySize();
                     long o2Size = o2.getMemorySize();
-                
+
                     if (o1Size == o2Size) {
                         return 0;
                     }
@@ -254,8 +252,10 @@ public class SpillableMemoryManager impl
                 // we force GC to make sure we really need to keep this
                 // object before paying for the expensive spill().
                 // Done at most once per handleNotification.
+                // Do not invoke extraGC for GroupingSpillable. Its size will always exceed
+                // extraGCSpillSizeThreshold and the data is always strong referenced.
                 if( !extraGCCalled && extraGCSpillSizeThreshold != 0
-                    && toBeFreed > extraGCSpillSizeThreshold   ) {
+                    && toBeFreed > extraGCSpillSizeThreshold  && !(s instanceof GroupingSpillable)) {
                     log.debug("Single spillable has size " + toBeFreed + "bytes. Calling extra gc()");
                     // this extra assignment to null is needed so that gc can free the
                     // spillable if nothing else is pointing at it
@@ -271,7 +271,7 @@ public class SpillableMemoryManager impl
                         continue;
                     }
                 }
-                s.spill();               
+                s.spill();
                 numObjSpilled++;
                 estimatedFreed += toBeFreed;
                 accumulatedFreeSize += toBeFreed;
@@ -280,13 +280,13 @@ public class SpillableMemoryManager impl
                 if (accumulatedFreeSize > gcActivationSize) {
                     invokeGC = true;
                 }
-                
+
                 if (estimatedFreed > toFree) {
                     log.debug("Freed enough space - getting out of memory handler");
                     invokeGC = true;
                     break;
                 }
-            }           
+            }
             /* Poke the GC again to see if we successfully freed enough memory */
             if(invokeGC) {
                 System.gc();
@@ -301,7 +301,7 @@ public class SpillableMemoryManager impl
 
         }
     }
-    
+
     public void clearSpillables() {
         synchronized (spillables) {
             // Walk the list first and remove nulls, otherwise the sort

Modified: pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/UDFContext.java Thu Nov 27 12:49:54 2014
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 
 public class UDFContext {
@@ -41,6 +43,10 @@ public class UDFContext {
         }
     };
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class);
+    }
+
     private UDFContext() {
         udfConfs = new HashMap<UDFContextKey, Properties>();
     }
@@ -62,7 +68,8 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
-    public static void destroy() {
+    @StaticDataCleanup
+    public static void cleanupStaticData() {
         tss = new ThreadLocal<UDFContext>() {
             @Override
             public UDFContext initialValue() {

Modified: pig/branches/spark/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/Utils.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/Utils.java Thu Nov 27 12:49:54 2014
@@ -23,10 +23,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.SequenceInputStream;
-import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
@@ -42,12 +40,10 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
@@ -65,7 +61,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
@@ -76,6 +71,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.QueryParserDriver;
+import org.joda.time.DateTimeZone;
 
 import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
@@ -85,17 +81,32 @@ import com.google.common.primitives.Long
  */
 public class Utils {
     private static final Log log = LogFactory.getLog(Utils.class);
-    
+    private static final Pattern JAVA_MAXHEAPSIZE_PATTERN = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
+
+
     /**
      * This method checks whether JVM vendor is IBM
      * @return true if IBM JVM is being used
      * false otherwise
      */
-    public static boolean isVendorIBM() {    	
+    public static boolean isVendorIBM() {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
-    
-    
+
+    public static boolean isHadoop23() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b0\\.23\\..+\\b"))
+            return true;
+        return false;
+    }
+
+    public static boolean isHadoop2() {
+        String version = org.apache.hadoop.util.VersionInfo.getVersion();
+        if (version.matches("\\b2\\.\\d+\\..+"))
+            return true;
+        return false;
+    }
+
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are
@@ -238,7 +249,7 @@ public class Utils {
     }
 
     public static LogicalSchema parseSchema(String schemaString) throws ParserException {
-        QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), 
+        QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
                 "util", new HashMap<String, String>() ) ;
         LogicalSchema schema = queryParser.parseSchema(schemaString);
         return schema;
@@ -249,7 +260,7 @@ public class Utils {
      * field. This will be called only when PigStorage is invoked with
      * '-tagFile' or '-tagPath' option and the schema file is present to be
      * loaded.
-     * 
+     *
      * @param schema
      * @param fieldName
      * @return ResourceSchema
@@ -383,7 +394,7 @@ public class Utils {
         } else if (TEMPFILE_STORAGE.TFILE.lowerName().equals(tmpFileCompressionStorage)) {
             return TEMPFILE_STORAGE.TFILE;
         } else {
-            throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage + 
+            throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
                     ". Should be one of " + Arrays.toString(TEMPFILE_STORAGE.values()));
         }
     }
@@ -582,7 +593,7 @@ public class Utils {
             // substitute
             eval = eval.substring(0, match.start())+val+eval.substring(match.end());
         }
-        throw new IllegalStateException("Variable substitution depth too large: " 
+        throw new IllegalStateException("Variable substitution depth too large: "
                 + MAX_SUBST + " " + expr);
     }
 
@@ -648,4 +659,35 @@ public class Utils {
       return null;
 
     }
+
+    public static int extractHeapSizeInMB(String input) {
+        int ret = 0;
+        if(input == null || input.equals(""))
+            return ret;
+        Matcher m = JAVA_MAXHEAPSIZE_PATTERN.matcher(input);
+        String heapStr = null;
+        String heapNum = null;
+        // Grabs the last match which takes effect (in case that multiple Xmx options specified)
+        while (m.find()) {
+            heapStr = m.group(1);
+            heapNum = m.group(2);
+        }
+        if (heapStr != null) {
+            // when Xmx specified in Gigabyte
+            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+                ret = Integer.parseInt(heapNum) * 1024;
+            } else {
+                ret = Integer.parseInt(heapNum);
+            }
+        }
+        return ret;
+    }
+
+    public static void setDefaultTimeZone(Configuration conf) {
+        String dtzStr = conf.get(PigConfiguration.PIG_DATETIME_DEFAULT_TIMEZONE);
+        if (dtzStr != null && dtzStr.length() > 0) {
+            // don't use offsets because it breaks across DST/Standard Time
+            DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
+        }
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/orc/OrcUtils.java Thu Nov 27 12:49:54 2014
@@ -27,7 +27,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.pig.PigWarning;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -57,6 +60,7 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.joda.time.DateTime;
 
 public class OrcUtils {
@@ -92,7 +96,14 @@ public class OrcUtils {
             for (Map.Entry<Object, Object> entry : m.entrySet()) {
                 Object convertedKey = convertOrcToPig(entry.getKey(), keyObjectInspector, null);
                 Object convertedValue = convertOrcToPig(entry.getValue(), valueObjectInspector, null);
-                ((Map)result).put(convertedKey.toString(), convertedValue);
+                if (convertedKey!=null) {
+                    ((Map)result).put(convertedKey.toString(), convertedValue);
+                } else {
+                    PigStatusReporter reporter = PigStatusReporter.getInstance();
+                    if (reporter != null) {
+                       reporter.incrCounter(PigWarning.UDF_WARNING_1, 1);
+                    }
+                }
             }
             break;
         case LIST:
@@ -125,6 +136,12 @@ public class OrcUtils {
         case STRING:
             result = poi.getPrimitiveJavaObject(obj);
             break;
+        case CHAR:
+            result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue();
+            break;
+        case VARCHAR:
+            result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue();
+            break;
         case BYTE:
             result = (int)(Byte)poi.getPrimitiveJavaObject(obj);
             break;
@@ -222,6 +239,12 @@ public class OrcUtils {
             case STRING:
                 fieldSchema.setType(DataType.CHARARRAY);
                 break;
+            case VARCHAR:
+                fieldSchema.setType(DataType.CHARARRAY);
+                break;
+            case CHAR:
+                fieldSchema.setType(DataType.CHARARRAY);
+                break;
             case TIMESTAMP:
                 fieldSchema.setType(DataType.DATETIME);
                 break;

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014
@@ -512,7 +512,7 @@ public class ExpToPhyTranslationVisitor 
             }
             List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
             if (cacheFiles != null) {
-                ((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
+                ((POUserFunc)p).setCacheFiles(cacheFiles);
             }
         } else {
             p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Nov 27 12:49:54 2014
@@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreResources;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -142,6 +143,8 @@ public class LogToPhyTranslationVisitor 
         load.setSignature(loLoad.getSignature());
         load.setLimit(loLoad.getLimit());
         load.setIsTmpLoad(loLoad.isTmpLoad());
+        load.setCacheFiles(loLoad.getLoadFunc().getCacheFiles());
+        load.setShipFiles(loLoad.getLoadFunc().getShipFiles());
 
         currentPlan.add(load);
         logToPhyMap.put(loLoad, load);
@@ -631,6 +634,7 @@ public class LogToPhyTranslationVisitor 
                     List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
 
                     POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cross.getRequestedParallelism(), fePlans, flattenLst );
+                    fe.setMapSideOnly(true);
                     fe.addOriginalLocation(cross.getAlias(), cross.getLocation());
                     currentPlan.add(fe);
                     currentPlan.connect(logToPhyMap.get(op), fe);
@@ -953,8 +957,11 @@ public class LogToPhyTranslationVisitor 
         store.setSortInfo(loStore.getSortInfo());
         store.setIsTmpStore(loStore.isTmpStore());
         store.setStoreFunc(loStore.getStoreFunc());
-
         store.setSchema(Util.translateSchema( loStore.getSchema() ));
+        if (loStore.getStoreFunc() instanceof StoreResources) {
+            store.setCacheFiles(((StoreResources)loStore.getStoreFunc()).getCacheFiles());
+            store.setShipFiles(((StoreResources)loStore.getStoreFunc()).getShipFiles());
+        }
 
         currentPlan.add(store);