You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/25 04:33:24 UTC

svn commit: r1597371 [1/2] - in /pig/branches/tez: ./ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache...

Author: daijy
Date: Sun May 25 02:33:23 2014
New Revision: 1597371

URL: http://svn.apache.org/r1597371
Log:
Merge latest trunk changes

Added:
    pig/branches/tez/shims/src/hadoop23/org/apache/hadoop/
      - copied from r1597370, pig/trunk/shims/src/hadoop23/org/apache/hadoop/
Removed:
    pig/branches/tez/src/org/apache/pig/newplan/PColFilterExtractor.java
Modified:
    pig/branches/tez/   (props changed)
    pig/branches/tez/CHANGES.txt
    pig/branches/tez/build.xml
    pig/branches/tez/conf/pig.properties
    pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
    pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
    pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
    pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java
    pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/PigConstants.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
    pig/branches/tez/src/org/apache/pig/builtin/JsonMetadata.java
    pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
    pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
    pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
    pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
    pig/branches/tez/test/org/apache/pig/TestMain.java
    pig/branches/tez/test/org/apache/pig/test/TestPigStorage.java

Propchange: pig/branches/tez/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1596060-1597370

Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Sun May 25 02:33:23 2014
@@ -21,6 +21,16 @@ Pig Change Log
 Trunk (unreleased changes)
  
 INCOMPATIBLE CHANGES
+ 
+IMPROVEMENTS
+ 
+OPTIMIZATIONS
+ 
+BUG FIXES
+
+Release 0.13.0 - Unreleased
+ 
+INCOMPATIBLE CHANGES
 
 PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
 
@@ -32,6 +42,12 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3929: pig.temp.dir should allow to substitute vars as hadoop configuration does (aniket486)
+
+PIG-3913: Pig should use job's jobClient wherever possible (fixes local mode counters) (aniket486)
+
+PIG-3941: Piggybank's Over UDF returns an output schema with named fields (mrflip via cheolsoo)
+
 PIG-3545: Seperate validation rules from optimizer (daijy)
 
 PIG-3745: Document auto local mode for pig (aniket486)
@@ -153,8 +169,6 @@ PIG-3905: 0.12.1 release can't be build 
 
 PIG-3894: Datetime function AddDuration, SubtractDuration and all Between functions don't check for null values in the input tuple (jennythompson via cheolsoo)
 
-PIG-3772: Syntax error when casting an inner schema of a bag and line break involved (ssvinarchukhorton via daijy)
-
 PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
 
 PIG-3895: Pigmix run script has compilation error (rohini)

Modified: pig/branches/tez/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/build.xml?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/build.xml (original)
+++ pig/branches/tez/build.xml Sun May 25 02:33:23 2014
@@ -31,7 +31,7 @@
     <property name="pigsmoke.pom" value="${basedir}/ivy/pigsmoke.pom" />
     <property name="pigunit.pom" value="${basedir}/ivy/pigunit.pom" />
     <property name="piggybank.pom" value="${basedir}/ivy/piggybank.pom" />
-    <property name="pig.version" value="0.13.0" />
+    <property name="pig.version" value="0.14.0" />
     <property name="pig.version.suffix" value="-SNAPSHOT" />
     <property name="version" value="${pig.version}${pig.version.suffix}" />
     <property name="final.name" value="${name}-${version}" />

Modified: pig/branches/tez/conf/pig.properties
URL: http://svn.apache.org/viewvc/pig/branches/tez/conf/pig.properties?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/conf/pig.properties (original)
+++ pig/branches/tez/conf/pig.properties Sun May 25 02:33:23 2014
@@ -63,6 +63,8 @@
 #
 # pig.pretty.print.schema=false
 
+# === Profiling UDFs  ===
+
 # Turn on UDF timers? This will cause two counters to be
 # tracked for every UDF and LoadFunc in your script: approx_microsecs measures
 # approximate time spent inside a UDF approx_invocations reports the approximate
@@ -74,6 +76,9 @@
 #
 # pig.udf.profile=false
 
+# Specify frequency of profiling (default: every 100th).
+# pig.udf.profile.frequency=100
+
 ############################################################################
 #
 # == Site-specific Properties

Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Sun May 25 02:33:23 2014
@@ -131,29 +131,50 @@ import org.apache.pig.impl.logicalLayer.
  *
  * <p>Example Usage:
  * <p>To do a cumulative sum:
- * <p><pre> A = load 'T';
- * B = group A by si
- * C = foreach B {
- *     C1 = order A by d;
- *     generate flatten(Stitch(C1, Over(C1.f, 'sum(float)')));
+ * <p><pre> A = load 'T' AS (si:chararray, i:int, d:long, f:float, s:chararray);
+ * C = foreach (group A by si) {
+ *     Aord = order A by d;
+ *     generate flatten(Stitch(Aord, Over(Aord.f, 'sum(float)')));
  * }
- * D = foreach C generate s, $9;</pre>
+ * D = foreach C generate s, $5;</pre>
  * <p> This is equivalent to the SQL statement
  * <p><tt>select s, sum(f) over (partition by si order by d) from T;</tt>
  *
  * <p>To find the record 3 ahead of the current record, using a window between
  * the current row and 3 records ahead and a default value of 0.
- * <p><pre> A = load 'T';
- * B = group A by si;
- * C = foreach B {
- *     C1 = order A by i;
- *     generate flatten(Stitch(C1, Over(C1.i, 'lead', 0, 3, 3, 0)));
+ * <p><pre> A = load 'T' AS (si:chararray, i:int, d:long, f:float, s:chararray);
+ * C = foreach (group A by si) {
+ *     Aord = order A by i;
+ *     generate flatten(Stitch(Aord, Over(Aord.i, 'lead', 0, 3, 3, 0)));
  * }
  * D = foreach C generate s, $9;</pre>
  * <p> This is equivalent to the SQL statement
  * <p><tt>select s, lead(i, 3, 0) over (partition by si order by i rows between
-         * current row and 3 following) over T;</tt>
+ * current row and 3 following) over T;</tt>
+ *
+ * <p>Over accepts a constructor argument specifying the name and type,
+ * colon-separated, of its return schema.</p>
  *
+ * <p><pre>
+ * DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int');
+ * cities = LOAD 'cities' AS (city:chararray, state:chararray, pop:int);
+ * -- Decorate each city with its population rank within the state it belongs to:
+ * ranked = FOREACH(GROUP cities BY state) {
+ *   c_ord = ORDER cities BY pop DESC;
+ *   GENERATE FLATTEN(Stitch(c_ord,
+ *     IOver(c_ord, 'rank', -1, -1, 2))); -- beginning (-1) to end (-1) on third field (2)
+ * };
+ * DESCRIBE ranked;
+ * -- ranked: {stitched::city: chararray,stitched::state: chararray,stitched::pop: int,stitched::state_rk: int}
+ * DUMP ranked;
+ * -- ...
+ * -- (Nashville,Tennessee,609644,2)
+ * -- (Houston,Texas,2145146,1)
+ * -- (San Antonio,Texas,1359758,2)
+ * -- (Dallas,Texas,1223229,3)
+ * -- (Austin,Texas,820611,4)
+ * -- ...
+ * </pre></p>
  */
 public class Over extends EvalFunc<DataBag> {
 
@@ -165,7 +186,8 @@ public class Over extends EvalFunc<DataB
     private boolean initialized;
     private EvalFunc<? extends Object> func;
     private Object[] udfArgs;
-    private byte returnType;
+    private byte   returnType;
+    private String returnName;
 
     public Over() {
         initialized = false;
@@ -174,9 +196,16 @@ public class Over extends EvalFunc<DataB
         returnType = DataType.UNKNOWN;
     }
 
-    public Over(String returnType) {
+    public Over(String typespec) {
         this();
-        this.returnType = DataType.findTypeByName(returnType);
+        if (typespec.contains(":")) {
+            String[] fn_tn = typespec.split(":", 2);
+            this.returnName = fn_tn[0];
+            this.returnType = DataType.findTypeByName(fn_tn[1]);
+        } else {
+            this.returnName = "result";
+            this.returnType = DataType.findTypeByName(typespec);
+        }
     }
 
     @Override
@@ -229,7 +258,11 @@ public class Over extends EvalFunc<DataB
             if (returnType == DataType.UNKNOWN) {
                 return Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
             } else {
-                return Schema.generateNestedSchema(DataType.BAG, returnType);
+                Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType));
+                return new Schema(new Schema.FieldSchema(
+                        getSchemaName(this.getClass().getName().toLowerCase(), inputSch),
+                            outputTupleSchema, 
+                            DataType.BAG));
             }
         } catch (FrontendException fe) {
             throw new RuntimeException("Unable to create nested schema", fe);

Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Sun May 25 02:33:23 2014
@@ -17,7 +17,13 @@
  */
 package org.apache.pig.piggybank.storage;
 
-import org.joda.time.DateTime;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,14 +34,16 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-
-import java.io.IOException;
-import java.sql.*;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
 
 public class DBStorage extends StoreFunc {
   private final Log log = LogFactory.getLog(getClass());
@@ -49,6 +57,11 @@ public class DBStorage extends StoreFunc
   private int count = 0;
   private String insertQuery;
 
+  //We want to store the schema if possible so that we can try to deal with nulls.
+  protected ResourceSchema schema = null;
+  private String udfcSignature = null;
+  private static final String SCHEMA_SIGNATURE = "pig.dbstorage.schema";
+
   public DBStorage(String driver, String jdbcURL, String insertQuery) {
     this(driver, jdbcURL, null, null, insertQuery, "100");
   }
@@ -85,10 +98,15 @@ public class DBStorage extends StoreFunc
       for (int i = 0; i < size; i++) {
         try {
           Object field = tuple.get(i);
-
           switch (DataType.findType(field)) {
           case DataType.NULL:
-            ps.setNull(sqlPos, java.sql.Types.VARCHAR);
+            //Default to varchar
+            int nullSqlType = java.sql.Types.VARCHAR;
+            if (schema != null) {
+                ResourceFieldSchema fs = schema.getFields()[i];
+                nullSqlType = sqlDataTypeFromPigDataType(fs.getType());
+            }
+            ps.setNull(sqlPos, nullSqlType);
             sqlPos++;
             break;
 
@@ -164,9 +182,7 @@ public class DBStorage extends StoreFunc
       }
     } catch (SQLException e) {
       try {
-        log
-            .error("Unable to insert record:" + tuple.toDelimitedString("\t"),
-                e);
+        log.error("Unable to insert record:" + tuple.toDelimitedString("\t"), e);
       } catch (ExecException ee) {
         // do nothing
       }
@@ -180,6 +196,30 @@ public class DBStorage extends StoreFunc
     }
   }
 
+  protected int sqlDataTypeFromPigDataType(byte pigDataType) {
+      switch(pigDataType) {
+      case DataType.INTEGER:
+          return java.sql.Types.INTEGER;
+      case DataType.LONG:
+          return java.sql.Types.BIGINT;
+      case DataType.FLOAT:
+          return java.sql.Types.FLOAT;
+      case DataType.DOUBLE:
+          return java.sql.Types.DOUBLE;
+      case DataType.BOOLEAN:
+          return java.sql.Types.BOOLEAN;
+      case DataType.DATETIME:
+          return java.sql.Types.DATE;
+      case DataType.BYTEARRAY:
+      case DataType.CHARARRAY:
+      case DataType.BYTE:
+          return java.sql.Types.VARCHAR;
+      default:
+          log.warn("Can not find SQL data type for " + pigDataType + " returning VARCHAR");
+          return java.sql.Types.VARCHAR;
+      }
+  }
+
   class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {
 
     @Override
@@ -250,18 +290,18 @@ public class DBStorage extends StoreFunc
 
     @Override
     public RecordWriter<NullWritable, NullWritable> getRecordWriter(
-        TaskAttemptContext context) throws IOException, InterruptedException {
+      TaskAttemptContext context) throws IOException, InterruptedException {
       // We don't use a record writer to write to database
-    	return new RecordWriter<NullWritable, NullWritable>() {
-    		   	  @Override
-    		   	  public void close(TaskAttemptContext context) {
-    		   		  // Noop
-    		    	  }
-    		    	  @Override
-    		    	  public void write(NullWritable k, NullWritable v) {
-    		    		  // Noop
-    		    	  }
-    		      };
+      return new RecordWriter<NullWritable, NullWritable>() {
+          @Override
+          public void close(TaskAttemptContext context) {
+              // Noop
+          }
+          @Override
+          public void write(NullWritable k, NullWritable v) {
+              // Noop
+          }
+      };
     }
 
   }
@@ -298,10 +338,39 @@ public class DBStorage extends StoreFunc
       throw new IOException("JDBC Error", e);
     }
     count = 0;
+
+    // Try to get the schema from the UDFContext object.
+    UDFContext udfc = UDFContext.getUDFContext();
+    Properties p =
+        udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+    String strSchema = p.getProperty(SCHEMA_SIGNATURE);
+    if (strSchema != null) {
+        // Parse the schema from the string stored in the properties object.
+        schema = new ResourceSchema(Utils.getSchemaFromString(strSchema));
+    }
   }
 
   @Override
   public void setStoreLocation(String location, Job job) throws IOException {
     // IGNORE since we are writing records to DB.
   }
+
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature) {
+      // store the signature so we can use it later
+      udfcSignature = signature;
+  }
+
+  @Override
+  public void checkSchema(ResourceSchema s) throws IOException {
+      // We won't really check the schema here, we'll store it in our
+      // UDFContext properties object so we have it when we need it on the
+      // backend
+
+      UDFContext udfc = UDFContext.getUDFContext();
+      Properties p =
+          udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
+      p.setProperty(SCHEMA_SIGNATURE, s.toString());
+  }
+
 }

Modified: pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Sun May 25 02:33:23 2014
@@ -52,19 +52,25 @@ public class TestOver {
         func = new Over("chararray");
         in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
         out = func.outputSchema(in);
-        assertEquals("{{chararray}}", out.toString());
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_1: {result: chararray}}", out.toString());
 
         // int
         func = new Over("Int");
         in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
         out = func.outputSchema(in);
-        assertEquals("{{int}}", out.toString());
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_2: {result: int}}", out.toString());
 
         // double
         func = new Over("DOUBLE");
         in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
         out = func.outputSchema(in);
-        assertEquals("{{double}}", out.toString());
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString());
+
+        // named 
+        func = new Over("bob:chararray");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString());
     }
 
     @Test

Modified: pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestDBStorage.java Sun May 25 02:33:23 2014
@@ -46,156 +46,173 @@ import junit.framework.TestCase;
 
 public class TestDBStorage extends TestCase {
 
-	private PigServer pigServer;
-	private MiniCluster cluster;
-	private Server dbServer;
-	private String driver = "org.hsqldb.jdbcDriver";
-	// private String url = "jdbc:hsqldb:mem:.";
+    private PigServer pigServer;
+    private MiniCluster cluster;
+    private Server dbServer;
+    private String driver = "org.hsqldb.jdbcDriver";
+    // private String url = "jdbc:hsqldb:mem:.";
     private String TMP_DIR;
     private String dblocation;
     private String url;
     private String dbUrl = "jdbc:hsqldb:hsql://localhost/" + "batchtest";
-	private String user = "sa";
-	private String password = "";
+    private String user = "sa";
+    private String password = "";
 
-	private static final String INPUT_FILE = "datafile.txt";
+    private static final String INPUT_FILE = "datafile.txt";
 
-	public TestDBStorage() throws ExecException, IOException {
-		// Initialise Pig server
-		cluster = MiniCluster.buildCluster();
-		pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-		pigServer.getPigContext().getProperties()
-				.setProperty("mapred.map.max.attempts", "1");
-		pigServer.getPigContext().getProperties()
-				.setProperty("mapred.reduce.max.attempts", "1");
-		System.out.println("Pig server initialized successfully");
+    public TestDBStorage() throws ExecException, IOException {
+        // Initialise Pig server
+        cluster = MiniCluster.buildCluster();
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer.getPigContext().getProperties()
+                .setProperty("mapred.map.max.attempts", "1");
+        pigServer.getPigContext().getProperties()
+                .setProperty("mapred.reduce.max.attempts", "1");
+        System.out.println("Pig server initialized successfully");
         TMP_DIR = System.getProperty("user.dir") + "/build/test/";
         dblocation = TMP_DIR + "batchtest";
         url = "jdbc:hsqldb:file:" + dblocation
                + ";hsqldb.default_table_type=cached;hsqldb.cache_rows=100";
-		// Initialise DBServer
-		dbServer = new Server();
-		dbServer.setDatabaseName(0, "batchtest");
-		// dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true");
-		dbServer.setDatabasePath(0,
+        // Initialise DBServer
+        dbServer = new Server();
+        dbServer.setDatabaseName(0, "batchtest");
+        // dbServer.setDatabasePath(0, "mem:test;sql.enforce_strict_size=true");
+        dbServer.setDatabasePath(0,
                             "file:" + TMP_DIR + "batchtest;sql.enforce_strict_size=true");
-		dbServer.setLogWriter(null);
-		dbServer.setErrWriter(null);
-		dbServer.start();                                                                     
-		System.out.println("Database URL: " + dbUrl);     
-		try {
-			Class.forName(driver);
-		} catch (Exception e) {
-			e.printStackTrace();
-			System.out.println(this + ".setUp() error: " + e.getMessage());
-		}
-		System.out.println("Database server started on port: " + dbServer.getPort()); 
-	}
-
-	private void createFile() throws IOException {
-		PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
-		w = new PrintWriter(new FileWriter(INPUT_FILE));
-		w.println("100\tapple\t1.0\t2008-01-01");
-		w.println("100\torange\t2.0\t2008-02-01");
-		w.println("100\tbanana\t1.1\t2008-03-01");
-		w.close();
-		Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-	}
-
-	private void createTable() throws IOException {
-		Connection con = null;
-		String sql = "create table ttt (id integer, name varchar(32), ratio double, dt date)";
-		try {
-			con = DriverManager.getConnection(url, user, password);
-		} catch (SQLException sqe) {
-			throw new IOException("Unable to obtain a connection to the database",
-					sqe);
-		}
-		try {
-			Statement st = con.createStatement();
-			st.executeUpdate(sql);
-			st.close();
-			con.commit();
-			con.close();
-		} catch (SQLException sqe) {
-			throw new IOException("Cannot create table", sqe);
-		}
-	}
-
-	@Before
-	public void setUp() throws IOException {
-		createFile();
-		createTable();
-	}
-
-	@After
-	public void tearDown() throws IOException {
-		new File(INPUT_FILE).delete();
-		Util.deleteFile(cluster, INPUT_FILE);
-		pigServer.shutdown();
-		dbServer.stop();
-		cluster.shutDown();
-
-		File[] dbFiles = new File(TMP_DIR).listFiles(new FilenameFilter() {
-			@Override
-			public boolean accept(File dir, String name) {
-				if (name.startsWith("batchtest")) {
-					return true;
-				} else {
-					return false;
-				}
-			}
-		});
-		if (dbFiles != null) {
-			for (File file : dbFiles) {
-				file.delete();
-			}
-		}
-	}
-
-	public void testWriteToDB() throws IOException {
-		String insertQuery = "insert into ttt (id, name, ratio, dt) values (?,?,?,?)";
-		pigServer.setBatchOn();
-		String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver                                                                                                                       
-	            + "', '" + Util.encodeEscape(url) + "', '" + insertQuery + "');";
-		pigServer.registerQuery("A = LOAD '" + INPUT_FILE
-				+ "' as (id:int, fruit:chararray, ratio:double, dt : datetime);");
-		pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore);
-	  ExecJob job = pigServer.executeBatch().get(0);
-		try {
-			while(!job.hasCompleted()) Thread.sleep(1000);
-		} catch(InterruptedException ie) {// ignore
-		}
-	  
-		assertNotSame("Failed: " + job.getException(), job.getStatus(),
-						ExecJob.JOB_STATUS.FAILED);
-		
-		Connection con = null;
-		String selectQuery = "select id, name, ratio, dt from ttt order by name";
-		try {
-			con = DriverManager.getConnection(url, user, password);
-		} catch (SQLException sqe) {
-			throw new IOException(
-					"Unable to obtain database connection for data verification", sqe);
-		}
-		try {
-			PreparedStatement ps = con.prepareStatement(selectQuery);
-			ResultSet rs = ps.executeQuery();
-
-			int expId = 100;
-			String[] expNames = { "apple", "banana", "orange" };
-			double[] expRatios = { 1.0, 1.1, 2.0 };
+        dbServer.setLogWriter(null);
+        dbServer.setErrWriter(null);
+        dbServer.start();
+        System.out.println("Database URL: " + dbUrl);
+        try {
+            Class.forName(driver);
+        } catch (Exception e) {
+            e.printStackTrace();
+            System.out.println(this + ".setUp() error: " + e.getMessage());
+        }
+        System.out.println("Database server started on port: " + dbServer.getPort());
+    }
+
+    private void createFile() throws IOException {
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w.println("100\tapple\t1.0\t2008-01-01");
+        w.println("100\torange\t2.0\t2008-02-01");
+        w.println("100\tbanana\t1.1\t2008-03-01");
+        w.println("\t\t\t");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+    }
+
+    private void createTable() throws IOException {
+        Connection con = null;
+        String sql = "create table ttt (id integer, name varchar(32), ratio double, dt date)";
+        try {
+            con = DriverManager.getConnection(url, user, password);
+        } catch (SQLException sqe) {
+            throw new IOException("Unable to obtain a connection to the database",
+                    sqe);
+        }
+        try {
+            Statement st = con.createStatement();
+            st.executeUpdate(sql);
+            st.close();
+            con.commit();
+            con.close();
+        } catch (SQLException sqe) {
+            throw new IOException("Cannot create table", sqe);
+        }
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        createFile();
+        createTable();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        new File(INPUT_FILE).delete();
+        Util.deleteFile(cluster, INPUT_FILE);
+        pigServer.shutdown();
+        dbServer.stop();
+        cluster.shutDown();
+
+        File[] dbFiles = new File(TMP_DIR).listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(File dir, String name) {
+                if (name.startsWith("batchtest")) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        });
+        if (dbFiles != null) {
+            for (File file : dbFiles) {
+                file.delete();
+            }
+        }
+    }
+
+    public void testWriteToDB() throws IOException {
+        String insertQuery = "insert into ttt (id, name, ratio, dt) values (?,?,?,?)";
+        pigServer.setBatchOn();
+        String dbStore = "org.apache.pig.piggybank.storage.DBStorage('" + driver
+                + "', '" + Util.encodeEscape(url) + "', '" + insertQuery + "');";
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE
+                + "' as (id:int, fruit:chararray, ratio:double, dt : datetime);");
+        pigServer.registerQuery("STORE A INTO 'dummy' USING " + dbStore);
+      ExecJob job = pigServer.executeBatch().get(0);
+        try {
+            while(!job.hasCompleted()) Thread.sleep(1000);
+        } catch(InterruptedException ie) {// ignore
+        }
+
+        assertNotSame("Failed: " + job.getException(), job.getStatus(),
+                        ExecJob.JOB_STATUS.FAILED);
+
+        Connection con = null;
+        String selectQuery = "select id, name, ratio, dt from ttt order by name";
+        try {
+            con = DriverManager.getConnection(url, user, password);
+        } catch (SQLException sqe) {
+            throw new IOException(
+                    "Unable to obtain database connection for data verification", sqe);
+        }
+        try {
+            PreparedStatement ps = con.prepareStatement(selectQuery);
+            ResultSet rs = ps.executeQuery();
+
+            int expId = 100;
+            String[] expNames = { "apple", "banana", "orange" };
+            double[] expRatios = { 1.0, 1.1, 2.0 };
                         Date []  expDates = {new Date(2008,01,01),new Date(2008,02,01),new Date(2008,03,01)};
-			for (int i = 0; i < 3 && rs.next(); i++) {
-				assertEquals("Id mismatch", expId, rs.getInt(1));
-				assertEquals("Name mismatch", expNames[i], rs.getString(2));
-				assertEquals("Ratio mismatch", expRatios[i], rs.getDouble(3), 0.0001);
-				assertEquals("Date mismatch", expDates[i], rs.getDate(4));
-			}
-
-		} catch (SQLException sqe) {
-			throw new IOException(
-					"Unable to read data from database for verification", sqe);
-		}
-	}
+            for (int i = 0; i < 4 && rs.next(); i++) {
+                //Need to check for nulls explicitly.
+                if ( i == 0) {
+                    //Id
+                    rs.getInt(1);
+                    assertTrue(rs.wasNull());
+                    //Name
+                    rs.getString(2);
+                    assertTrue(rs.wasNull());
+                    //Ratio
+                    rs.getDouble(3);
+                    assertTrue(rs.wasNull());
+                    //Date
+                    rs.getDate(4);
+                    assertTrue(rs.wasNull());
+                } else {
+                    assertEquals("Id mismatch", expId, rs.getInt(1));
+                    assertEquals("Name mismatch", expNames[i-1], rs.getString(2));
+                    assertEquals("Ratio mismatch", expRatios[i-1], rs.getDouble(3), 0.0001);
+                    assertEquals("Date mismatch", expDates[i-1], rs.getDate(4));
+                }
+            }
+
+        } catch (SQLException sqe) {
+            throw new IOException(
+                    "Unable to read data from database for verification", sqe);
+        }
+    }
 }

Modified: pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun May 25 02:33:23 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.hadoop.mapred.TaskReport;
@@ -32,6 +33,7 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop20.PigJobControl;
@@ -53,9 +55,9 @@ public class HadoopShims {
     }
 
     static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
-                                TaskAttemptID taskId) {
+            TaskAttemptID taskId) {
         TaskAttemptContext newContext = new TaskAttemptContext(conf,
-            taskId);
+                taskId);
         return newContext;
     }
 
@@ -95,7 +97,7 @@ public class HadoopShims {
     }
 
     public static JobControl newJobControl(String groupName, int timeToSleep) {
-      return new PigJobControl(groupName, timeToSleep);
+        return new PigJobControl(groupName, timeToSleep);
     }
 
     public static long getDefaultBlockSize(FileSystem fs, Path path) {
@@ -148,4 +150,37 @@ public class HadoopShims {
         return true;
     }
 
+    /**
+     * Returns the progress of a Job j which is part of a submitted JobControl
+     * object. The progress is for this Job. So it has to be scaled down by the
+     * num of jobs that are present in the JobControl.
+     *
+     * @param j The Job for which progress is required
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    public static double progressOfRunningJob(Job j)
+            throws IOException {
+        RunningJob rj = j.getJobClient().getJob(j.getAssignedJobID());
+        if (rj == null && j.getState() == Job.SUCCESS)
+            return 1;
+        else if (rj == null)
+            return 0;
+        else {
+            return (rj.mapProgress() + rj.reduceProgress()) / 2;
+        }
+    }
+
+    public static void killJob(Job job) throws IOException {
+        RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
+        if (runningJob != null)
+            runningJob.killJob();
+    }
+
+    public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+        JobClient jobClient = job.getJobClient();
+        return (type == TaskType.MAP)
+                ? jobClient.getMapTaskReports(job.getAssignedJobID())
+                        : jobClient.getReduceTaskReports(job.getAssignedJobID());
+    }
 }

Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun May 25 02:33:23 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.DowngradeHelper;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TIPStatus;
 import org.apache.hadoop.mapred.TaskReport;
@@ -55,7 +56,7 @@ public class HadoopShims {
     }
 
     static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
-                                TaskAttemptID taskId) {
+            TaskAttemptID taskId) {
         if (conf instanceof JobConf) {
             return new TaskAttemptContextImpl(new JobConf(conf), taskId);
         } else {
@@ -115,8 +116,12 @@ public class HadoopShims {
         return fs.getDefaultBlockSize(path);
     }
 
-    public static Counters getCounters(Job job) throws IOException, InterruptedException {
-        return new Counters(job.getJob().getCounters());
+    public static Counters getCounters(Job job) throws IOException {
+        try {
+            return new Counters(job.getJob().getCounters());
+        } catch (Exception ir) {
+            throw new IOException(ir);
+        }
     }
 
     public static boolean isJobFailed(TaskReport report) {
@@ -177,4 +182,43 @@ public class HadoopShims {
         return true;
     }
 
+    /**
+     * Returns the progress of a Job j which is part of a submitted JobControl
+     * object. The progress is for this Job. So it has to be scaled down by the
+     * num of jobs that are present in the JobControl.
+     *
+     * @param j The Job for which progress is required
+     * @return Returns the percentage progress of this Job
+     * @throws IOException
+     */
+    public static double progressOfRunningJob(Job j)
+            throws IOException {
+        org.apache.hadoop.mapreduce.Job mrJob = j.getJob();
+        try {
+            return (mrJob.mapProgress() + mrJob.reduceProgress()) / 2;
+        } catch (Exception ir) {
+            return 0;
+        }
+    }
+
+    public static void killJob(Job job) throws IOException {
+        org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+        try {
+            if (mrJob != null) {
+                mrJob.killJob();
+            }
+        } catch (Exception ir) {
+            throw new IOException(ir);
+        }
+    }
+
+    public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+        org.apache.hadoop.mapreduce.Job mrJob = job.getJob();
+        try {
+            org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+            return DowngradeHelper.downgradeTaskReports(reports);
+        } catch (InterruptedException ir) {
+            throw new IOException(ir);
+        }
+    }
 }

Modified: pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/tez/src/docs/src/documentation/content/xdocs/perf.xml Sun May 25 02:33:23 2014
@@ -24,7 +24,7 @@
   
 <section id="profiling">
   <title>Timing your UDFs</title>
-  <p>The first step to improving performance and efficiency is measuring where the time is going. Pig provides a light-weight method for approximately measuring how much time is spent in different user-defined functions (UDFs) and Loaders. Simply set the pig.udf.profile property to true. This will cause new counters to be tracked for all Map-Reduce jobs generated by your script: approx_microsecs measures the approximate amount of time spent in a UDF, and approx_invocations measures the approximate number of times the UDF was invoked. Note that this may produce a large number of counters (two per UDF). Excessive amounts of counters can lead to poor JobTracker performance, so use this feature carefully, and preferably on a test cluster.</p>
+  <p>The first step to improving performance and efficiency is measuring where the time is going. Pig provides a light-weight method for approximately measuring how much time is spent in different user-defined functions (UDFs) and Loaders. Simply set the pig.udf.profile property to true. This will cause new counters to be tracked for all Map-Reduce jobs generated by your script: approx_microsecs measures the approximate amount of time spent in a UDF, and approx_invocations measures the approximate number of times the UDF was invoked. In addition, the frequency of profiling can be configured via the pig.udf.profile.frequency (by default, every 100th invocation). Note that this may produce a large number of counters (two per UDF). Excessive amounts of counters can lead to poor JobTracker performance, so use this feature carefully, and preferably on a test cluster.</p>
 </section>
 
 <!-- ================================================================== -->

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Sun May 25 02:33:23 2014
@@ -51,7 +51,8 @@ public class PigConfiguration {
      * Controls whether execution time of Pig UDFs should be tracked.
      * This feature uses counters; use judiciously.
      */
-    public static final String TIME_UDFS_PROP = "pig.udf.profile";
+    public static final String TIME_UDFS = "pig.udf.profile";
+    public static final String TIME_UDFS_FREQUENCY = "pig.udf.profile.frequency";
 
     /**
      * This key must be set to true by the user for code generation to be used.

Modified: pig/branches/tez/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConstants.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConstants.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConstants.java Sun May 25 02:33:23 2014
@@ -52,4 +52,9 @@ public class PigConstants {
      */
     public static final String PIG_LOCAL_CONF_PREFIX = "pig.local.";
 
+    /**
+     * Counter names used by pig.udf.profile
+     */
+    public static final String TIME_UDFS_INVOCATION_COUNTER = "approx_invocations";
+    public static final String TIME_UDFS_ELAPSED_TIME_COUNTER = "approx_microsecs";
 }
\ No newline at end of file

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Sun May 25 02:33:23 2014
@@ -254,46 +254,18 @@ public abstract class Launcher {
      * @return The progress as a precentage in double format
      * @throws IOException
      */
-    protected double calculateProgress(JobControl jc, JobClient jobClient)
+    protected double calculateProgress(JobControl jc)
             throws IOException {
         double prog = 0.0;
         prog += jc.getSuccessfulJobs().size();
 
         List<Job> runnJobs = jc.getRunningJobs();
-        for (Object object : runnJobs) {
-            Job j = (Job) object;
-            prog += progressOfRunningJob(j, jobClient);
+        for (Job j : runnJobs) {
+            prog += HadoopShims.progressOfRunningJob(j);
         }
         return prog;
     }
 
-    /**
-     * Returns the progress of a Job j which is part of a submitted JobControl
-     * object. The progress is for this Job. So it has to be scaled down by the
-     * num of jobs that are present in the JobControl.
-     *
-     * @param j
-     *            - The Job for which progress is required
-     * @param jobClient
-     *            - the JobClient to which it has been submitted
-     * @return Returns the percentage progress of this Job
-     * @throws IOException
-     */
-    protected double progressOfRunningJob(Job j, JobClient jobClient)
-            throws IOException {
-        JobID mrJobID = j.getAssignedJobID();
-        RunningJob rj = jobClient.getJob(mrJobID);
-        if (rj == null && j.getState() == Job.SUCCESS)
-            return 1;
-        else if (rj == null)
-            return 0;
-        else {
-            double mapProg = rj.mapProgress();
-            double redProg = rj.reduceProgress();
-            return (mapProg + redProg) / 2;
-        }
-    }
-
     public long getTotalHadoopTimeSpent() {
         return totalHadoopTimeSpent;
     }
@@ -320,7 +292,7 @@ public abstract class Launcher {
 
     /**
      *
-     * @param stackTraceLine
+     * @param stackTrace
      *            The string representation of
      *            {@link Throwable#printStackTrace() printStackTrace} Handles
      *            internal PigException and its subclasses that override the
@@ -357,7 +329,7 @@ public abstract class Launcher {
 
     /**
      *
-     * @param stackTraceLine
+     * @param stackTraceLines
      *            An array of strings that represent
      *            {@link Throwable#printStackTrace() printStackTrace} output,
      *            split by newline

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sun May 25 02:33:23 2014
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -103,9 +104,7 @@ public class MapReduceLauncher extends L
             log.debug("Receive kill signal");
             if (jc!=null) {
                 for (Job job : jc.getRunningJobs()) {
-                    RunningJob runningJob = job.getJobClient().getJob(job.getAssignedJobID());
-                    if (runningJob!=null)
-                        runningJob.killJob();
+                    HadoopShims.killJob(job);
                     log.info("Job " + job.getAssignedJobID() + " killed");
                 }
             }
@@ -343,7 +342,7 @@ public class MapReduceLauncher extends L
                     }
                     jobsWithoutIds.removeAll(jobsAssignedIdInThisRun);
 
-                    double prog = (numMRJobsCompl+calculateProgress(jc, statsJobClient))/totalMRJobs;
+                    double prog = (numMRJobsCompl+calculateProgress(jc))/totalMRJobs;
                     if (notifyProgress(prog, lastProg)) {
                         List<Job> runnJobs = jc.getRunningJobs();
                         if (runnJobs != null) {
@@ -455,7 +454,7 @@ public class MapReduceLauncher extends L
             Exception backendException = null;
             for (Job fj : failedJobs) {
                 try {
-                    getStats(fj, statsJobClient, true, pc);
+                    getStats(fj, true, pc);
                 } catch (Exception e) {
                     backendException = e;
                 }
@@ -495,9 +494,9 @@ public class MapReduceLauncher extends L
                     }
                 }
 
-                getStats(job, statsJobClient, false, pc);
+                getStats(job, false, pc);
                 if (aggregateWarning) {
-                    computeWarningAggregate(job, statsJobClient, warningAggMap);
+                    computeWarningAggregate(job, warningAggMap);
                 }
             }
 
@@ -640,8 +639,8 @@ public class MapReduceLauncher extends L
         comp.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
 
         String lastInputChunkSize =
-            pc.getProperties().getProperty(
-                "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
+                pc.getProperties().getProperty(
+                        "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
 
         String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
         if (!pc.inIllustrator && !("true".equals(prop)))  {
@@ -747,47 +746,41 @@ public class MapReduceLauncher extends L
     }
 
     @SuppressWarnings("deprecation")
-    void computeWarningAggregate(Job job, JobClient jobClient, Map<Enum, Long> aggMap) {
-        JobID mapRedJobID = job.getAssignedJobID();
-        RunningJob runningJob = null;
+    void computeWarningAggregate(Job job, Map<Enum, Long> aggMap) {
         try {
-            runningJob = jobClient.getJob(mapRedJobID);
-            if(runningJob != null) {
-                Counters counters = runningJob.getCounters();
-                if (counters==null)
-                {
-                    long nullCounterCount = aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
-                    nullCounterCount++;
-                    aggMap.put(PigWarning.NULL_COUNTER_COUNT, nullCounterCount);
-                }
-                try {
-                    for (Enum e : PigWarning.values()) {
-                        if (e != PigWarning.NULL_COUNTER_COUNT) {
-                            Long currentCount = aggMap.get(e);
-                            currentCount = (currentCount == null ? 0 : currentCount);
-                            // This code checks if the counters is null, if it is,
-                            // we need to report to the user that the number
-                            // of warning aggregations may not be correct. In fact,
-                            // Counters should not be null, it is
-                            // a hadoop bug, once this bug is fixed in hadoop, the
-                            // null handling code should never be hit.
-                            // See Pig-943
-                            if (counters != null)
-                                currentCount += counters.getCounter(e);
-                            aggMap.put(e, currentCount);
-                        }
-                    }
-                } catch (Exception e) {
-                    log.warn("Exception getting counters.", e);
+            Counters counters = HadoopShims.getCounters(job);
+            if (counters==null)
+            {
+                long nullCounterCount =
+                        (aggMap.get(PigWarning.NULL_COUNTER_COUNT) == null)
+                          ? 0
+                          : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
+                nullCounterCount++;
+                aggMap.put(PigWarning.NULL_COUNTER_COUNT, nullCounterCount);
+            }
+            for (Enum e : PigWarning.values()) {
+                if (e != PigWarning.NULL_COUNTER_COUNT) {
+                    Long currentCount = aggMap.get(e);
+                    currentCount = (currentCount == null ? 0 : currentCount);
+                    // This code checks if the counters is null, if it is,
+                    // we need to report to the user that the number
+                    // of warning aggregations may not be correct. In fact,
+                    // Counters should not be null, it is
+                    // a hadoop bug, once this bug is fixed in hadoop, the
+                    // null handling code should never be hit.
+                    // See Pig-943
+                    if (counters != null)
+                        currentCount += counters.getCounter(e);
+                    aggMap.put(e, currentCount);
                 }
             }
-        } catch (IOException ioe) {
+        } catch (Exception e) {
             String msg = "Unable to retrieve job to compute warning aggregation.";
             log.warn(msg);
         }
     }
 
-    private void getStats(Job job, JobClient jobClient, boolean errNotDbg,
+    private void getStats(Job job, boolean errNotDbg,
             PigContext pigContext) throws ExecException {
         JobID MRJobID = job.getAssignedJobID();
         String jobMessage = job.getMessage();
@@ -809,11 +802,11 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
+            TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
             getErrorMessages(mapRep, "map", errNotDbg, pigContext);
             totalHadoopTimeSpent += computeTimeSpent(mapRep);
             mapRep = null;
-            TaskReport[] redRep = jobClient.getReduceTaskReports(MRJobID);
+            TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
             getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
             totalHadoopTimeSpent += computeTimeSpent(redRep);
             redRep = null;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Sun May 25 02:33:23 2014
@@ -17,7 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
-import static org.apache.pig.PigConfiguration.TIME_UDFS_PROP;
+import static org.apache.pig.PigConfiguration.TIME_UDFS;
+import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -54,10 +56,8 @@ public class PigRecordReader extends Rec
 
     private static final Log LOG = LogFactory.getLog(PigRecordReader.class);
 
-    private final static String TIMING_COUNTER = "approx_microsecs";
-    private final static long TIMING_FREQ = 100;
-
     transient private String counterGroup = "";
+    private long timingFrequency = 100L;
     private boolean doTiming = false;
 
     /**
@@ -119,8 +119,11 @@ public class PigRecordReader extends Rec
         idx = 0;
         this.limit = limit;
         initNextRecordReader();
-        counterGroup = loadFunc.toString();
-        doTiming = context.getConfiguration().getBoolean(TIME_UDFS_PROP, false);
+        doTiming = inputSpecificConf.getBoolean(TIME_UDFS, false);
+        if (doTiming) {
+            counterGroup = loadFunc.toString();
+            timingFrequency = inputSpecificConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+        }
     }
 
     @Override
@@ -193,7 +196,7 @@ public class PigRecordReader extends Rec
 
         if (limit != -1 && recordCount >= limit)
             return false;
-        boolean timeThis = doTiming && ( (recordCount + 1) % TIMING_FREQ == 0);
+        boolean timeThis = doTiming && ( (recordCount + 1) % timingFrequency == 0);
         long startNanos = 0;
         if (timeThis) {
             startNanos = System.nanoTime();
@@ -204,8 +207,8 @@ public class PigRecordReader extends Rec
             }
         }
         if (timeThis) {
-            reporter.incrCounter(counterGroup, TIMING_COUNTER,
-                    Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
+            reporter.incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
+                    Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
         }
         recordCount++;
         return true;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Sun May 25 02:33:23 2014
@@ -18,7 +18,10 @@
 
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
-import static org.apache.pig.PigConfiguration.TIME_UDFS_PROP;
+import static org.apache.pig.PigConfiguration.TIME_UDFS;
+import static org.apache.pig.PigConfiguration.TIME_UDFS_FREQUENCY;
+import static org.apache.pig.PigConstants.TIME_UDFS_INVOCATION_COUNTER;
+import static org.apache.pig.PigConstants.TIME_UDFS_ELAPSED_TIME_COUNTER;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -58,19 +61,13 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class POUserFunc extends ExpressionOperator {
+    private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(POUserFunc.class);
-    private final static String TIMING_COUNTER = "approx_microsecs";
-    private final static String INVOCATION_COUNTER = "approx_invocations";
-    private final static long TIMING_FREQ = 100;
-    private final static TupleFactory tf = TupleFactory.getInstance();
+    private static final TupleFactory tf = TupleFactory.getInstance();
 
     private transient String counterGroup;
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
-    transient EvalFunc func;
-    transient private String[] cacheFiles = null;
+    private transient EvalFunc func;
+    private transient String[] cacheFiles = null;
 
     FuncSpec funcSpec;
     FuncSpec origFSpec;
@@ -86,6 +83,7 @@ public class POUserFunc extends Expressi
     private boolean haveCheckedIfTerminatingAccumulator;
 
     private long numInvocations = 0L;
+    private long timingFrequency = 100L;
     private boolean doTiming = false;
 
     public PhysicalOperator getReferencedOperator() {
@@ -97,9 +95,7 @@ public class POUserFunc extends Expressi
     }
 
     public POUserFunc(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-        super(k, rp);
-        inputs = inp;
-
+        this(k, rp, inp, null);
     }
 
     public POUserFunc(
@@ -157,8 +153,11 @@ public class POUserFunc extends Expressi
             func.setPigLogger(pigLogger);
             Configuration jobConf = UDFContext.getUDFContext().getJobConf();
             if (jobConf != null) {
-                doTiming = "true".equalsIgnoreCase(jobConf.get(TIME_UDFS_PROP, "false"));
-                counterGroup = funcSpec.toString();
+                doTiming = jobConf.getBoolean(TIME_UDFS, false);
+                if (doTiming) {
+                    counterGroup = funcSpec.toString();
+                    timingFrequency = jobConf.getLong(TIME_UDFS_FREQUENCY, 100L);
+                }
             }
             // We initialize here instead of instantiateFunc because this is called
             // when actual processing has begun, whereas a function can be instantiated
@@ -270,10 +269,10 @@ public class POUserFunc extends Expressi
     private Result getNext() throws ExecException {
         Result result = processInput();
         long startNanos = 0;
-        boolean timeThis = doTiming && (numInvocations++ % TIMING_FREQ == 0);
+        boolean timeThis = doTiming && (numInvocations++ % timingFrequency == 0);
         if (timeThis) {
             startNanos = System.nanoTime();
-            PigStatusReporter.getInstance().incrCounter(counterGroup, INVOCATION_COUNTER, TIMING_FREQ);
+            PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_INVOCATION_COUNTER, timingFrequency);
         }
         try {
             if(result.returnStatus == POStatus.STATUS_OK) {
@@ -354,8 +353,8 @@ public class POUserFunc extends Expressi
                 }
             }
             if (timeThis) {
-                PigStatusReporter.getInstance().incrCounter(counterGroup, TIMING_COUNTER,
-                        Math.round((System.nanoTime() - startNanos) / 1000) * TIMING_FREQ);
+                PigStatusReporter.getInstance().incrCounter(counterGroup, TIME_UDFS_ELAPSED_TIME_COUNTER,
+                        Math.round((System.nanoTime() - startNanos) / 1000) * timingFrequency);
             }
             return result;
         } catch (ExecException ee) {

Modified: pig/branches/tez/src/org/apache/pig/builtin/JsonMetadata.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/JsonMetadata.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/JsonMetadata.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/JsonMetadata.java Sun May 25 02:33:23 2014
@@ -122,9 +122,7 @@ public class JsonMetadata implements Loa
 
                     if (descriptor instanceof HFile) {
                         Path descriptorPath = ((HPath) descriptor).getPath();
-                        String fileName = descriptorPath.getName();
                         Path parent = descriptorPath.getParent();
-                        String parentName = parent.toString();
                         container = new HDirectory((HDataStorage)storage,parent);
                     } else { // descriptor instanceof HDirectory
                         container = (HDirectory)descriptor;
@@ -314,10 +312,11 @@ public class JsonMetadata implements Loa
                 OutputStream os = headerFilePath.create();
                 try {
                     String[] names = schema.fieldNames();
-
+                    String fn;
                     for (int i=0; i < names.length; i++) {
-                        os.write(names[i].getBytes("UTF-8"));
-                        if (i <names.length-1) {
+                        fn = ( (names[i] == null) ? ("$"+i) : names[i] );
+                        os.write(fn.getBytes("UTF-8"));
+                        if (i < names.length-1) {
                             os.write(fieldDel);
                         } else {
                             os.write(recordDel);

Modified: pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java Sun May 25 02:33:23 2014
@@ -58,6 +58,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.datastorage.HPath;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.Utils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -462,7 +463,7 @@ public class FileLocalizer {
             throws DataStorageException {
 
         if (relativeRoot.get() == null) {
-            String tdir= pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR, "/tmp");
+            String tdir= Utils.substituteVars(pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR, "/tmp"));
             ContainerDescriptor relative = pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt());
             relativeRoot.set(relative);
             try {

Modified: pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java Sun May 25 02:33:23 2014
@@ -194,6 +194,6 @@ public abstract class PigNullableWritabl
 
     @Override
     public String toString() {
-        return "Null: " + mNull + " index: " + mIndex + " " + mValue.toString();
+        return "Null: " + mNull + " index: " + mIndex + (mNull ? "" : " " + mValue.toString());
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/Utils.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/Utils.java Sun May 25 02:33:23 2014
@@ -35,6 +35,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -583,4 +585,33 @@ public class Utils {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }
 
+    // PIG-3929 use parameter substitution for pig properties similar to Hadoop Configuration
+    // Following code has been borrowed from Hadoop's Configuration#substituteVars
+    private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
+    private static int MAX_SUBST = 20;
+
+    public static String substituteVars(String expr) {
+        if (expr == null) {
+            return null;
+        }
+        Matcher match = varPat.matcher("");
+        String eval = expr;
+        for(int s=0; s<MAX_SUBST; s++) {
+            match.reset(eval);
+            if (!match.find()) {
+                return eval;
+            }
+            String var = match.group();
+            var = var.substring(2, var.length()-1); // remove ${ .. }
+            String val = null;
+            val = System.getProperty(var);
+            if (val == null) {
+                return eval; // return literal ${var}: var is unbound
+            }
+            // substitute
+            eval = eval.substring(0, match.start())+val+eval.substring(match.end());
+        }
+        throw new IllegalStateException("Variable substitution depth too large: " 
+                + MAX_SUBST + " " + expr);
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Sun May 25 02:33:23 2014
@@ -261,7 +261,7 @@ TOKEN_MGR_DECLS : {
 	<"'"> {prevState = PIG_START;} : IN_STRING
 |	<"`"> {prevState = PIG_START;} : IN_COMMAND
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+ > {prevState = PIG_START;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
 |       <"{"> {pigBlockLevel = 1;} : IN_BLOCK
 |       <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
 |       <";"> : PIG_END
@@ -364,7 +364,7 @@ TOKEN_MGR_DECLS : {
 {
 	<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
 |	<"{"> {pigBlockLevel++;}
 |       <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
 |	<"'"> {prevState = IN_BLOCK;} : IN_STRING

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/InputStats.java Sun May 25 02:33:23 2014
@@ -84,7 +84,7 @@ public final class InputStats {
         return type;
     }
 
-    public String getDisplayString(boolean local) {
+    public String getDisplayString() {
         StringBuilder sb = new StringBuilder();
         if (success) {
             sb.append("Successfully ");
@@ -96,7 +96,7 @@ public final class InputStats {
                 sb.append("read ");
             }
 
-            if (!local && records >= 0) {
+            if (records >= 0) {
                 sb.append(records).append(" records ");
             } else {
                 sb.append("records ");

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/JobStats.java Sun May 25 02:33:23 2014
@@ -174,7 +174,7 @@ public abstract class JobStats extends O
         exception = e;
     }
 
-    public abstract String getDisplayString(boolean isLocal);
+    public abstract String getDisplayString();
 
 
     /**

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/OutputStats.java Sun May 25 02:33:23 2014
@@ -105,12 +105,12 @@ public final class OutputStats {
     public Configuration getConf() {
         return conf;
     }
-    
-    public String getDisplayString(boolean local) {
+
+    public String getDisplayString() {
         StringBuilder sb = new StringBuilder();
         if (success) {
             sb.append("Successfully stored ");
-            if (!local && records >= 0) {
+            if (records >= 0) {
                 sb.append(records).append(" records ");
             } else {
                 sb.append("records ");

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStats.java Sun May 25 02:33:23 2014
@@ -109,6 +109,7 @@ public abstract class PigStats {
         return errorThrowable;
     }
 
+    @Deprecated
     public abstract JobClient getJobClient();
 
     public abstract boolean isEmbedded();

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sun May 25 02:33:23 2014
@@ -35,10 +35,13 @@ import org.apache.hadoop.mapred.JobClien
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.io.FileSpec;
@@ -71,9 +74,6 @@ public final class MRJobStats extends Jo
 
     public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
 
-    // currently counters are not working in local mode - see PIG-1286
-    public static final String SUCCESS_HEADER_LOCAL = "JobId\tAlias\tFeature\tOutputs";
-
     private static final Log LOG = LogFactory.getLog(MRJobStats.class);
 
     private List<POStore> mapStores = null;
@@ -237,11 +237,20 @@ public final class MRJobStats extends Jo
         medianReduceTime = median;
     }
 
+    private static void appendStat(long stat, StringBuilder sb) {
+        if(stat != -1) {
+            sb.append(stat/1000);
+        } else {
+            sb.append("n/a");
+        }
+        sb.append("\t");
+    }
+
     @Override
-    public String getDisplayString(boolean local) {
+    public String getDisplayString() {
         StringBuilder sb = new StringBuilder();
         String id = (jobId == null) ? "N/A" : jobId.toString();
-        if (state == JobState.FAILED || local) {
+        if (state == JobState.FAILED) {
             sb.append(id).append("\t")
                 .append(getAlias()).append("\t")
                 .append(getFeature()).append("\t");
@@ -252,22 +261,15 @@ public final class MRJobStats extends Jo
             sb.append(id).append("\t")
                 .append(numberMaps).append("\t")
                 .append(numberReduces).append("\t");
-            if (numberMaps == 0) {
-                sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
-            } else {
-                sb.append(maxMapTime/1000).append("\t")
-                    .append(minMapTime/1000).append("\t")
-                    .append(avgMapTime/1000).append("\t")
-                    .append(medianMapTime/1000).append("\t");
-            }
-            if (numberReduces == 0) {
-                sb.append("n/a\t").append("n/a\t").append("n/a\t").append("n/a\t");
-            } else {
-                sb.append(maxReduceTime/1000).append("\t")
-                    .append(minReduceTime/1000).append("\t")
-                    .append(avgReduceTime/1000).append("\t")
-                    .append(medianReduceTime/1000).append("\t");
-            }
+            appendStat(maxMapTime, sb);
+            appendStat(minMapTime, sb);
+            appendStat(avgMapTime, sb);
+            appendStat(medianMapTime, sb);
+            appendStat(maxReduceTime, sb);
+            appendStat(minReduceTime, sb);
+            appendStat(avgReduceTime, sb);
+            appendStat(medianReduceTime, sb);
+
             sb.append(getAlias()).append("\t")
                 .append(getFeature()).append("\t");
         }
@@ -278,13 +280,11 @@ public final class MRJobStats extends Jo
         return sb.toString();
     }
 
-    void addCounters(RunningJob rjob) {
-        if (rjob != null) {
-            try {
-                counters = rjob.getCounters();
-            } catch (IOException e) {
-                LOG.warn("Unable to get job counters", e);
-            }
+    void addCounters(Job job) {
+        try {
+            counters = HadoopShims.getCounters(job);
+        } catch (IOException e) {
+            LOG.warn("Unable to get job counters", e);
         }
         if (counters != null) {
             Counters.Group taskgroup = counters
@@ -331,10 +331,10 @@ public final class MRJobStats extends Jo
         }
     }
 
-    void addMapReduceStatistics(JobClient client, Configuration conf) {
+    void addMapReduceStatistics(Job job) {
         TaskReport[] maps = null;
         try {
-            maps = client.getMapTaskReports(jobId);
+            maps = HadoopShims.getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
@@ -367,7 +367,7 @@ public final class MRJobStats extends Jo
 
         TaskReport[] reduces = null;
         try {
-            reduces = client.getReduceTaskReports(jobId);
+            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
             LOG.warn("Failed to get reduce task report", e);
         }

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Sun May 25 02:33:23 2014
@@ -230,21 +230,9 @@ public class MRPigStatsUtil extends PigS
         } else {
             js.setSuccessful(true);
 
-            js.addMapReduceStatistics(ps.getJobClient(), job.getJobConf());
+            js.addMapReduceStatistics(job);
 
-            JobClient client = ps.getJobClient();
-            RunningJob rjob = null;
-            try {
-                rjob = client.getJob(job.getAssignedJobID());
-            } catch (IOException e) {
-                LOG.warn("Failed to get running job", e);
-            }
-            if (rjob == null) {
-                LOG.warn("Failed to get RunningJob for job "
-                        + job.getAssignedJobID());
-            } else {
-                js.addCounters(rjob);
-            }
+            js.addCounters(job);
 
             js.addOutputStatistics();
 

Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Sun May 25 02:33:23 2014
@@ -172,6 +172,7 @@ public final class SimplePigStats extend
         return startTime > 0;
     }
 
+    @Deprecated
     @Override
     public JobClient getJobClient() {
         return jobClient;
@@ -215,20 +216,14 @@ public final class SimplePigStats extend
             return;
         }
 
-        // currently counters are not working in local mode - see PIG-1286
-        ExecType execType = pigContext.getExecType();
-        if (execType.isLocal()) {
-            LOG.info("Detected Local mode. Stats reported below may be incomplete");
-        }
-
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
         StringBuilder sb = new StringBuilder();
         sb.append("\nHadoopVersion\tPigVersion\tUserId\tStartedAt\tFinishedAt\tFeatures\n");
         sb.append(getHadoopVersion()).append("\t").append(getPigVersion()).append("\t")
-            .append(userId).append("\t")
-            .append(sdf.format(new Date(startTime))).append("\t")
-            .append(sdf.format(new Date(endTime))).append("\t")
-            .append(getFeatures()).append("\n");
+        .append(userId).append("\t")
+        .append(sdf.format(new Date(startTime))).append("\t")
+        .append(sdf.format(new Date(endTime))).append("\t")
+        .append(getFeatures()).append("\n");
         sb.append("\n");
         if (returnCode == ReturnCode.SUCCESS) {
             sb.append("Success!\n");
@@ -242,14 +237,10 @@ public final class SimplePigStats extend
         if (returnCode == ReturnCode.SUCCESS
                 || returnCode == ReturnCode.PARTIAL_FAILURE) {
             sb.append("Job Stats (time in seconds):\n");
-            if (execType.isLocal()) {
-                sb.append(MRJobStats.SUCCESS_HEADER_LOCAL).append("\n");
-            } else {
-                sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
-            }
+            sb.append(MRJobStats.SUCCESS_HEADER).append("\n");
             List<JobStats> arr = jobPlan.getSuccessfulJobs();
             for (JobStats js : arr) {
-                sb.append(js.getDisplayString(execType.isLocal()));
+                sb.append(js.getDisplayString());
             }
             sb.append("\n");
         }
@@ -259,31 +250,29 @@ public final class SimplePigStats extend
             sb.append(MRJobStats.FAILURE_HEADER).append("\n");
             List<JobStats> arr = jobPlan.getFailedJobs();
             for (JobStats js : arr) {
-                sb.append(js.getDisplayString(execType.isLocal()));
+                sb.append(js.getDisplayString());
             }
             sb.append("\n");
         }
         sb.append("Input(s):\n");
         for (InputStats is : getInputStats()) {
-            sb.append(is.getDisplayString(execType.isLocal()));
+            sb.append(is.getDisplayString());
         }
         sb.append("\n");
         sb.append("Output(s):\n");
         for (OutputStats ds : getOutputStats()) {
-            sb.append(ds.getDisplayString(execType.isLocal()));
+            sb.append(ds.getDisplayString());
         }
 
-        if (!(execType.isLocal())) {
-            sb.append("\nCounters:\n");
-            sb.append("Total records written : " + getRecordWritten()).append("\n");
-            sb.append("Total bytes written : " + getBytesWritten()).append("\n");
-            sb.append("Spillable Memory Manager spill count : "
-                    + getSMMSpillCount()).append("\n");
-            sb.append("Total bags proactively spilled: "
-                    + getProactiveSpillCountObjects()).append("\n");
-            sb.append("Total records proactively spilled: "
-                    + getProactiveSpillCountRecords()).append("\n");
-        }
+        sb.append("\nCounters:\n");
+        sb.append("Total records written : " + getRecordWritten()).append("\n");
+        sb.append("Total bytes written : " + getBytesWritten()).append("\n");
+        sb.append("Spillable Memory Manager spill count : "
+                + getSMMSpillCount()).append("\n");
+        sb.append("Total bags proactively spilled: "
+                + getProactiveSpillCountObjects()).append("\n");
+        sb.append("Total records proactively spilled: "
+                + getProactiveSpillCountRecords()).append("\n");
 
         sb.append("\nJob DAG:\n").append(jobPlan.toString());
 

Modified: pig/branches/tez/test/org/apache/pig/TestMain.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/TestMain.java?rev=1597371&r1=1597370&r2=1597371&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/TestMain.java (original)
+++ pig/branches/tez/test/org/apache/pig/TestMain.java Sun May 25 02:33:23 2014
@@ -24,10 +24,8 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.BufferedWriter;
-import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileWriter;
-import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -37,7 +35,6 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.SourceLocation;
 import org.apache.pig.test.TestPigRunner.TestNotificationListener;
-import org.apache.pig.test.Util;
 import org.apache.pig.tools.parameters.ParameterSubstitutionException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
@@ -129,35 +126,6 @@ public class TestMain {
         }
     }
 
-    @Test
-    public void testParseInputScript() throws Exception {
-        File input = Util.createInputFile("tmp", "",
-                new String[]{"{(1,1.0)}\ttestinputstring1",
-                        "{(2,2.0)}\ttestinputstring1",
-                        "{(3,3.0)}\ttestinputstring1",
-                        "{(4,4.0)}\ttestinputstring1"}
-        );
-        File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut");
-        File scriptFile = Util.createInputFile("pigScript", "",
-                new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);",
-                        "B = foreach A generate\n" +
-                                "    b,\n" +
-                                "    (bag{tuple(long)}) a.x as ax:{(x:long)};",
-                        "store B into '"+out.getAbsolutePath()+"';"}
-        );
-
-        Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null);
-        BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000")));
-        String line;
-        int count = 0;
-        while(( line = file.readLine()) != null) {
-            count++;
-        }
-        assertEquals(4,count);
-        Util.deleteDirectory(new File(out.getAbsolutePath()));
-        assertTrue(!new File(out.getAbsolutePath()).exists());
-    }
-
     public static class TestNotificationListener2 extends TestNotificationListener {
         protected boolean hadArgs = false;
         public TestNotificationListener2() {}