You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/29 19:36:50 UTC

svn commit: r1591026 [1/2] - in /pig/branches/tez: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ contrib/piggybank/java/src/test/java/org/apach...

Author: cheolsoo
Date: Tue Apr 29 17:36:49 2014
New Revision: 1591026

URL: http://svn.apache.org/r1591026
Log:
Merge latest trunk changes

Added:
    pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
      - copied from r1591006, pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
    pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
      - copied unchanged from r1591006, pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
Modified:
    pig/branches/tez/   (props changed)
    pig/branches/tez/CHANGES.txt
    pig/branches/tez/bin/pig.cmd
    pig/branches/tez/build.xml
    pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
    pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
    pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
    pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
    pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java
    pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java
    pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java
    pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java
    pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
    pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    pig/branches/tez/src/pig-default.properties   (props changed)
    pig/branches/tez/test/org/apache/pig/TestMain.java
    pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/branches/tez/test/org/apache/pig/test/TestFetch.java
    pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java
    pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)
    pig/branches/tez/test/perf/pigmix/bin/runpigmix.pl

Propchange: pig/branches/tez/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1586886-1591006

Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Tue Apr 29 17:36:49 2014
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
  
 INCOMPATIBLE CHANGES
 
+PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
+
 PIG-3485: Remove CastUtils.bytesToMap(byte[] b) method from LoadCaster interface (cheolsoo) 
 
 PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
@@ -30,6 +32,16 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
+
+PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
+
+PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)
+
+PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
+
+PIG-3851: Upgrade jline to 2.11 (daijy)
+
 PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)
 
 PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
@@ -105,6 +117,24 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3909: Type Casting issue (daijy)
+
+PIG-3905: 0.12.1 release can't be build for Hadoop2 (daijy)
+
+PIG-3894: Datetime function AddDuration, SubtractDuration and all Between functions don't check for null values in the input tuple (jennythompson via cheolsoo)
+
+PIG-3772: Syntax error when casting an inner schema of a bag and line break involved (ssvinarchukhorton via daijy)
+
+PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
+
+PIG-3895: Pigmix run script has compilation error (rohini)
+
+PIG-3885: AccumuloStorage incompatible with Accumulo 1.6.0 (elserj via daijy)
+
+PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides (lbendig via daijy)
+
+PIG-3887: TestMRJobStats is broken in trunk (cheolsoo)
+
 PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
 
 PIG-3871: Replace org.python.google.* with com.google.* in imports (cheolsoo)

Modified: pig/branches/tez/bin/pig.cmd
URL: http://svn.apache.org/viewvc/pig/branches/tez/bin/pig.cmd?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/bin/pig.cmd (original)
+++ pig/branches/tez/bin/pig.cmd Tue Apr 29 17:36:49 2014
@@ -93,6 +93,9 @@ set PIGARGS=
     for %%i in (%PIG_HOME%\*.jar) do (
       set CLASSPATH=!CLASSPATH!;%%i
     )
+    for %%i in (%PIG_HOME%\lib\*.jar) do (
+      set CLASSPATH=!CLASSPATH!;%%i
+    )
     if not defined PIG_CONF_DIR (
       set PIG_CONF_DIR=%PIG_HOME%\conf
     )

Modified: pig/branches/tez/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/build.xml?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/build.xml (original)
+++ pig/branches/tez/build.xml Tue Apr 29 17:36:49 2014
@@ -986,14 +986,24 @@
         </copy>
 
         <copy todir="${tar.dist.dir}/lib">
-            <fileset dir="${ivy.lib.dir}" includes="jython*.jar"/>
-            <fileset dir="${ivy.lib.dir}" includes="jruby*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="jython-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="jruby-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="groovy-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="js-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="hbase-*.jar" excludes="hbase-*tests.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="protobuf-java-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="zookeeper-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="accumulo-*.jar" excludes="accumulo-minicluster*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="avro-*.jar" excludes="avro-*tests.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="json-simple-*.jar"/>
         </copy>
 
         <copy file="${output.jarfile.backcompat.withouthadoop}" tofile="${tar.dist.dir}/${final.name}-withouthadoop.jar" />
 
         <copy file="${output.jarfile.backcompat}" tofile="${tar.dist.dir}/${final.name}.jar" />
 
+        <copy todir="${tar.dist.dir}/lib" file="contrib/piggybank/java/piggybank.jar"/>
+
         <copy todir="${tar.dist.dir}/" file="ivy.xml" />
 
         <copy todir="${tar.dist.dir}/ivy">
@@ -1558,7 +1568,7 @@
 
      <target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies">
        <property name="ivy.resolved" value="true"/>
-       <ivy:resolve settingsRef="${ant.project.name}.ivy.settings"/>
+       <ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
      </target>
 
      <target name="ivy-compile" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for compile configuration">

Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Tue Apr 29 17:36:49 2014
@@ -18,693 +18,303 @@
 
 package org.apache.pig.piggybank.storage;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.tools.bzip2r.CBZip2InputStream;
-
-
-
 
 /**
- * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the 
- * decorator over the BufferedPositionedInputStream which in turn decorate
- * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
- * input stream, which it uses as
- * its  basic source of data, possibly reading or providing  additional
- * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code>
- * itself simply overrides the necessary medthod for reading i.e 
- * <code>read</code> <code>getPosition<code> with versions that
- * pass all requests to the contained  input
- * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code>
- * may further override some of  these methods
- * and may also provide additional methods
- * and fields.
- * It also provides additional method <code>collectTag<collect> which will give the byte 
- * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
+ * Parses an XML input file given a specified identifier of tags to be loaded.
+ * The output is a bag of XML elements where each element is returned as
+ * a chararray containing the text of the matched XML element including the
+ * start and tags as well as the data between them. In case of nesting elements
+ * of the matching tags, only the top level one is returned.
  *
- * @note we can't use the standard SAX or STAX parser as for a big xml 
- *       the intermittent hadoop block may not be the valid xml and hence those
- *       parser may create pb. 
- *
- * @since   pig 2.0
  */
+public class XMLLoader extends LoadFunc {
 
-class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream { 
-
-    public final static int S_START = 0;
-    public final static int S_MATCH_PREFIX = 1;
-    public final static int S_MATCH_TAG = 2;
+  /**
+   * Use this record reader to read XML tags out of a text file. It matches only
+   * the tags identified by an identifier configured through a call to
+   * {@link #setXMLIdentifier(String)}. It there are nesting tags of the given
+   * identifier, only the top level one is returned which also includes all
+   * enclosed tags.
+   */
+  public static class XMLRecordReader extends RecordReader<LongWritable, Text> {
+    protected final RecordReader<LongWritable, Text> wrapped;
 
+    /**Regular expression for XML tag identifier*/
+    private static final String XMLTagNameRegExp = "[a-zA-Z\\_][0-9a-zA-Z\\-_]+";
     /**
-     * The input streamed to be filtered 
+     * A regular expression that matches key parts in the XML text needed to
+     * correctly parse it and find matches of the given identifier
      */
-    InputStream wrapperIn;
+    private Pattern identifiersPattern;
 
-    /**
-     * The field to know if the underlying buffer contains any more bytes
-     */
-    boolean _isReadable;
+    private LongWritable key;
+    private Text value;
 
-    /**
-    * The field set the maximum bytes that is readable by this instance of stream.
-    */
-    private long maxBytesReadable = 0;
-    	
-    /**
-    * The field denote the number of bytes read by this stream. 
-    */
-    long bytesRead = 0;
-    	
-    /**
-    * Denotes the end of the current split location
-    */
-    long end = 0;
-    
-    /**
-     * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
-     * by assigning the  argument <code>in</code>
-     * to the field <code>this.wrapperIn</code> so as
-     * to remember it for later use.
-     *
-     * @param   in   the underlying input stream,
-     */
-    public XMLLoaderBufferedPositionedInputStream(InputStream in){
-        super(in);
-        this.wrapperIn = in;
-        setReadable(true);
-    }
-    
-    /**
-     * Creates a  split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
-     * @param in    the underlying input stream
-     * @param start    start location of the split
-     * @param end    end location of the split
-     */
-    public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
-       this(in);
-       this.end = end;
-       maxBytesReadable = end - start;
-    }
+    /**Position of the current buffer in the file*/
+    private long bufferPos;
 
-    /**
-     * Set the stream readable or non readable. This is needed
-     * to control the xml parsing.
-     * @param flag The boolean flag to be set
-     * @see XMLLoaderBufferedPositionedInputStream#isReadable
-     */
-    private void setReadable(boolean flag) {
-       _isReadable = flag;
-    }
+    /**Holds parts of the input file that were read but not parsed yet*/
+    private String buffer;
 
-    /**
-     * See if the stream readable or non readable. This is needed
-     * to control the xml parsing.
-     * @return  true if readable otherwise false
-     * @see XMLLoaderBufferedPositionedInputStream#setReadable
-     */
-    public boolean isReadable() {
-       return _isReadable == true;
-    }
+    /**Original end of the split to parse*/
+    private long originalEnd;
 
-    /**
-     * org.apache.pig.impl.io.BufferedPositionedInputStream.read
-     * It is just the wrapper for now.
-     * Reads the next byte of data from this input stream. The value
-     * byte is returned as an <code>int</code> in the range
-     * <code>0</code> to <code>255</code>. If no byte is available
-     * because the end of the stream has been reached, the value
-     * <code>-1</code> is returned. This method blocks until input data
-     * is available, the end of the stream is detected, or an exception
-     * is thrown.
-     * <p>
-     * This method
-     * simply performs <code>in.read()</code> and returns the result.
-     *
-     * @return     the next byte of data, or <code>-1</code> if the end of the
-     *             stream is reached.
-     * @exception  IOException  if an I/O error occurs.
-     * @see        XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn
-     */
-    public int read() throws IOException {
-       return wrapperIn.read();
-    }
-
-    /**
-     * This is collect the bytes from current position to the ending tag.
-     * This scans for the tags and do the pattern match byte by byte
-     * this must be used along with 
-     *  XMLLoaderBufferedPositionedInputStream#skipToTag
-     *
-     * @param tagName the end tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing the documents until the end of tag
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
-     */
-    private byte[] collectUntilEndTag(String tagName, long limit) {
+    private boolean terminated;
 
-      //@todo use the charset and get the charset encoding from the xml encoding.
-      byte[] tmp = tagName.getBytes();
-      ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
-      // Levels of elements we went inside matched node
-      int depth = 0;
-      
-      //Since skipToTag was called before this function, we know that we are
-      //currently inside the matched tag. Assuming the XML file is well
-      //structured, we read till we encounter the first close tag. Since
-      //the matched element might contain nested element, we keep track of the
-      //current depth and terminate only when we encounter a closing tag at
-      //level zero
-      
-      // A flag to indicate the parsing is currently inside a (start/end) tag
-      boolean insideTag = false;
-      // A flag to indicate that the current tag is a closing (end) tag
-      boolean closingTag = false;
-      
-      // Last byte read
-      int last_b = -1;
-      while (true) {
-        int b = -1;
-        try {
-          b = this.read();
-          ++bytesRead; // Add one to the bytes read
-          if (b == -1) {
-            collectBuf.reset();
-            this.setReadable(false);
-            break;
-          }
-          collectBuf.write((byte)(b));
-
-          // Check if the start tag has matched except for the last char
-          if (b == '<') {
-            insideTag = true;
-            closingTag = false;
-          } else if (b == '>') {
-            // Detect the pattern />
-            if (last_b == '/')
-              closingTag = true;
-            insideTag = false;
-            if (closingTag) {
-              if (depth == 0)
-                break;
-              depth--;
-            }
-          } else if (b == '/' && last_b == '<') {
-            // Detected the pattern </
-            closingTag = true;
-          } else if (insideTag && last_b == '<') {
-            // First character after '<' which is not a '/'
-            depth++;
-          }
-        }
-        catch (IOException e) {
-          this.setReadable(false);
-          return null;
-        }
-        last_b = b;
-      }
-      return collectBuf.toByteArray();
+    public XMLRecordReader(RecordReader<LongWritable, Text> wrapped) {
+      this.wrapped = wrapped;
     }
 
     /**
-     * This is collect the from the matching tag.
-     * This scans for the tags and do the pattern match byte by byte
-     * This returns a part doc. it must be used along with 
-     * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag
-     * 
-     * @param tagName the start tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing match of the tag.
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
+     * Delegate the initialization method to the wrapped stream after changing
+     * the length of the split to be non-ending.
      */
-    private byte[] skipToTag(String tagName, long limit) throws IOException {
-      
-      //@todo use the charset and get the charset encoding from the xml encoding.
-      byte[] tmp = tagName.getBytes();
-      byte[] tag = new byte[tmp.length + 1];
-      tag[0] = (byte)'<';
-      for (int i = 0; i < tmp.length; ++i) {
-        tag[1+i] = tmp[i];
-      }
-
-      ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
-      int idxTagChar = 0;
-      int state = S_START;
-      
-      /*
-       * Read till the tag is found in this block. If a partial tag block is found
-       * then continue on to the next block.matchBuf contains the data that is currently 
-       * matched. If the read has reached the end of split and there are matched data 
-       * then continue on to the next block.
-       */
-      while (splitBoundaryCriteria(wrapperIn) ||  (matchBuf.size() > 0 )) {
-        int b = -1;
-        try {
-          b = this.read();
-          ++bytesRead; // Increment the bytes read by 1
-          if (b == -1) {
-            state = S_START;
-            matchBuf.reset();
-            this.setReadable(false);
-            break;
-          }
-          switch (state) {
-            case S_START:
-              // start to match the target open tag
-              if (b == tag[idxTagChar]) {
-                ++idxTagChar;
-                matchBuf.write((byte)(b));
-                if (idxTagChar == tag.length) {
-                  state = S_MATCH_PREFIX;
-                }
-              } else {  // mismatch
-                idxTagChar = 0;
-                matchBuf.reset();
-              }
-              break;
-            case S_MATCH_PREFIX:
-              // tag match iff next character is whitespaces or close tag mark
-              if (Character.isWhitespace(b) || b == '/' || b == '>') {
-                matchBuf.write((byte)(b));
-                state = S_MATCH_TAG;
-              } else {
-                idxTagChar = 0;
-                matchBuf.reset();
-                state = S_START;
-              }
-              break;
-            case S_MATCH_TAG:
-              // keep copy characters until we hit the close tag mark
-              matchBuf.write((byte)(b));
-              break;
-            default:
-              throw new IllegalArgumentException("Invalid state: " + state);
-          }
-          if (state == S_MATCH_TAG && (b == '>' || Character.isWhitespace(b))) {
-            break;
-          }
-          if (state != S_MATCH_TAG && this.getPosition() > limit) {
-            // need to break, no record in this block
-            break;
-          }
-        }
-        catch (IOException e) {
-          this.setReadable(false);
-          return null;
-        }
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context)
+	throws IOException, InterruptedException {
+      key = new LongWritable();
+      value = new Text();
+      if (split instanceof FileSplit) {
+	FileSplit fsplit = (FileSplit) split;
+	originalEnd = fsplit.getStart() + fsplit.getLength();
+	Path path = fsplit.getPath();
+	long fileEnd = path.getFileSystem(context.getConfiguration()).getFileStatus(path).getLen();
+	FileSplit extendedSplit = new FileSplit(path, fsplit.getStart(),
+	    Math.min(fsplit.getLength() * 10, fileEnd - fsplit.getStart()), fsplit.getLocations());
+	this.wrapped.initialize(extendedSplit, context);
+      } else {
+	throw new RuntimeException("Cannot override a split of type'"+
+	    split.getClass()+"'");
       }
-      return matchBuf.toByteArray();
-    }
-    /**
-     * Returns whether the split boundary condition has reached or not.
-     * For normal files ; the condition is to read till the split end reaches.
-     * Gz files will have  maxBytesReadable set to near Long.MAXVALUE, hence
-     * this will cause the entire file to be read. For bz2 and bz files, the 
-     * condition lies on the position which until which it is read. 
-     *  
-     * @param wrapperIn2
-     * @return true/false depending on whether split boundary has reached or no
-     * @throws IOException
-     */
-    private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
-       if(wrapperIn2 instanceof CBZip2InputStream)
-          return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
-       else
-          return bytesRead <= maxBytesReadable;
     }
 
-    /**
-     * This is collect bytes from start and end tag both inclusive
-     * This scans for the tags and do the pattern match byte by byte
-     * 
-     * @param tagName the start tag to search for
-     *
-     * @param limit the end pointer for the block for this mapper
-     *
-     * @return the byte array containing match of the <code><tag>.*</tag><code>.
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag
-     *
-     * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
-     *
-     */
-    byte[] collectTag(String tagName, long limit) throws IOException {
-       ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
-       byte[] beginTag = skipToTag(tagName, limit);
-       
-       // Check if the tag is closed inline
-       if (beginTag.length > 2 && beginTag[beginTag.length - 2] == '/' &&
-           beginTag[beginTag.length-1] == '>') {
-         return beginTag;
-       }
-
-       // No need to search for the end tag if the start tag is not found
-       if(beginTag.length > 0 ){ 
-          byte[] untilTag = collectUntilEndTag(tagName, limit);
-          if (untilTag.length > 0) {
-             for (byte b: beginTag) {
-             collectBuf.write(b);
-           }
-           for (byte b: untilTag) {
-              collectBuf.write(b);
-           }
-          }
-       }
-       return collectBuf.toByteArray();
+    public void setXMLIdentifier(String identifier) {
+      if (!identifier.matches(XMLTagNameRegExp))
+	throw new RuntimeException("XML tag identifier '"+identifier+"' does not match the regular expression /"+XMLTagNameRegExp+"/");
+      String inlineClosedTagRegExp = "<\\s*"+identifier+"\\s*[^>]*/>";
+      String openTagRegExp = "<\\s*"+identifier+"(?:\\s*|\\s+(?:[^/>]*|[^>]*[^>/]))>";
+      String closeTagRegExp = "</\\s*"+identifier+"\\s*>";
+      identifiersPattern = Pattern.compile("("+inlineClosedTagRegExp+")|("+openTagRegExp+")|("+closeTagRegExp+")");
     }
 
-}
-
-
-/**
- * The load function to load the XML file
- * This implements the LoadFunc interface which is used to parse records
- * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter
- * which included various functions to cast raw byte data into various datatypes. 
- * other sections of the code can call back to the loader to do the cast.
- * This takes a xmlTag as the arg which it will use to split the inputdataset into
- * multiple records. 
- * <code>
- *    
- * For example if the input xml (input.xml) is like this
- *     <configuration>
- *         <property>
- *            <name> foobar </name>
- *            <value> barfoo </value>
- *         </property>
- *         <ignoreProperty>
- *           <name> foo </name>
- *         </ignoreProperty>
- *         <property>
- *            <name> justname </name>
- *         </property>
- *     </configuration>
- *
- *    And your pig script is like this
- *
- *    --load the jar files
- *    register /homes/aloks/pig/udfLib/loader.jar;
- *    -- load the dataset using XMLLoader
- *    -- A is the bag containing the tuple which contains one atom i.e doc see output
- *    A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray);
- *    --dump the result
- *    dump A;
- *
- *
- *    Then you will get the output
- *
- *    (<property>
- *             <name> foobar </name>
- *             <value> barfoo </value>
- *          </property>)
- *    (<property>
- *             <name> justname </name>
- *          </property>)
- *
- *
- *    Where each () indicate one record 
- *
- * 
- * </code>
- */
-
-public class XMLLoader extends LoadFunc {
-
-    /**
-     * logger from pig
-     */
-    protected final Log mLog = LogFactory.getLog(getClass());
-
-    private XMLFileRecordReader reader = null;
-
-
-    /**
-     * the tuple content which is used while returning
-     */
-    private ArrayList<Object> mProtoTuple = null;
-
-    /**
-     * The record seperated. The default value is 'document'
-     */
-    public String recordIdentifier = "document";
-
-    private String loadLocation;
-    
-    public XMLLoader() {
-
+    /* Delegate all methods to the wrapped stream */
+    public void close() throws IOException {
+      wrapped.close();
     }
 
-    /**
-     * Constructs a Pig loader that uses specified string as the record seperater
-     * for example if the recordIdentifier is document. It will consider the record as 
-     * <document> .* </document>
-     * 
-     * @param recordIdentifier the xml tag which is used to pull records
-     *
-     */
-    public XMLLoader(String recordIdentifier) {
-        this();
-        this.recordIdentifier = recordIdentifier;
+    public boolean equals(Object obj) {
+      return wrapped.equals(obj);
     }
 
-    /**
-     * Retrieves the next tuple to be processed.
-     * @return the next tuple to be processed or null if there are no more tuples
-     * to be processed.
-     * @throws IOException
-     */
-    @Override
-    public Tuple getNext() throws IOException {
- 
-        boolean next = false;
-        
-        try {
-            next = reader.nextKeyValue();
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-        
-        if (!next) return null;
-        
-        Tuple t = null;
-     
-        try {
-            byte[] tagContent = (byte[]) reader.getCurrentValue();
-            // No need to create the tuple if there are no contents
-            t = (tagContent.length > 0) ? createTuple(tagContent) : null;
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-
-        return t; 
- 
-    }
-    
-    public Tuple createTuple(byte[] content) throws Exception {
-        if (mProtoTuple == null) {
-            mProtoTuple = new ArrayList<Object>();
-        }
-        if (content.length > 0) {
-            mProtoTuple.add(new DataByteArray(content));
-        }
-        Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple);
-        mProtoTuple = null;
-
-        return t;
+    public LongWritable getCurrentKey() throws IOException, InterruptedException {
+      return key;
     }
 
-    /**
-     * to check for equality 
-     * @param object 
-     */
-    public boolean equals(Object obj) {
-        return equals((XMLLoader)obj);
+    public Text getCurrentValue() throws IOException, InterruptedException {
+      return value;
     }
 
-    /**
-     * to check for equality 
-     * @param XMLLoader object 
-     */
-    public boolean equals(XMLLoader other) {
-        return this.recordIdentifier.equals(other.recordIdentifier);
+    public float getProgress() throws IOException, InterruptedException {
+      return Math.max(1.0f, this.wrapped.getProgress() * 10);
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public InputFormat getInputFormat() throws IOException {
-       XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
-       if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
-          inputFormat.isSplitable = true;
-         }
-       return inputFormat;
+    public int hashCode() {
+      return wrapped.hashCode();
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void prepareToRead(RecordReader reader, PigSplit split)
-            throws IOException {
-        this.reader = (XMLFileRecordReader) reader;
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      if (this.terminated)
+	return false;
+      int depth = 0;
+      // In case of an tag matched with an open tag and a closed tag, this buffer
+      // is used to accumulate matched element if it is spans multiple lines.
+      StringBuffer currentMatch = new StringBuffer();
+      // The start offset of first matched open tag. This marks the first byte
+      // in the range to be copied to output.
+      int offsetOfFirstMatchedOpenTag = 0;
+      try {
+      while (true) {
+	while (buffer == null || buffer.length() == 0) {
+	  if (!wrapped.nextKeyValue())
+	    return false; // End of split
+	  // if passed the end offset of current split, terminate the matching
+	  if (bufferPos >= originalEnd && depth == 0) {
+	    this.terminated = true;
+	    return false;
+	  }
+
+	  bufferPos = wrapped.getCurrentKey().get();
+	  buffer = wrapped.getCurrentValue().toString();
+	}
+	Matcher matcher = identifiersPattern.matcher(buffer);
+	while (matcher.find()) {
+	  int startOfCurrentMatch = matcher.start();
+	  int endOfCurrentMatch = matcher.end();
+	  String group;
+	  if ((group = matcher.group(1)) != null) {
+	    // Matched an inline-closed tag
+	    value = new Text(group);
+	    this.key.set(bufferPos + matcher.start(1));
+	    bufferPos += matcher.end(1);
+	    buffer = buffer.substring(endOfCurrentMatch);
+	    return true;
+	  } else if ((group = matcher.group(2)) != null) {
+	    // Matched an open tag
+	    // If this is a top-level match (i.e., not enclosed in another matched
+	    // tag), all bytes starting from this offset will be copied to output
+	    // in one of two cases:
+	    // 1- When a matching close tag is found
+	    // 2- When an end of line is encountered
+	    if (depth == 0) {
+	      offsetOfFirstMatchedOpenTag = startOfCurrentMatch;
+	      this.key.set(bufferPos + startOfCurrentMatch);
+	    }
+	    depth++;
+	  } else if ((group = matcher.group(3)) != null) {
+	    // Matched a closed tag
+	    if (depth > 0) {
+	      depth--;
+	      if (depth == 0) {
+		// A full top-level match
+		// Copy all bytes to output
+		if (currentMatch.length() == 0) {
+		  // A full match in one line, return it immediately
+		  value = new Text(buffer.substring(offsetOfFirstMatchedOpenTag, endOfCurrentMatch));
+		} else {
+		  currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, endOfCurrentMatch);
+		  value = new Text(currentMatch.toString());
+		}
+		// Copy remaining non matched part to the buffer for next call
+		buffer = buffer.substring(endOfCurrentMatch);
+		bufferPos += endOfCurrentMatch;
+		return true;
+	      }
+	    }
+	  } else {
+	    throw new RuntimeException("Invalid match '"+matcher.group()+"' in string '"+buffer+"'");
+	  }
+	}
+	// No more matches in current line. If we are inside a match (i.e.,
+	// an open tag has been matched) copy all parts to the match.
+	// Otherwise, just drop it.
+	if (depth > 0) {
+	  // Inside a match
+	  currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, buffer.length());
+	}
+	buffer = null;
+      }
+      } catch (InterruptedException e) {
+	throw new IOException("Error getting input");
+      }
+
     }
 
-    @Override
-    public void setLocation(String location, Job job) throws IOException {
-        loadLocation = location; 
-        FileInputFormat.setInputPaths(job, location);
-    }
-    
-    //------------------------------------------------------------------------
-    // Implementation of InputFormat
-    
-    public static class XMLFileInputFormat extends FileInputFormat {
-
-       /**
-        * Boolean flag used to identify whether splittable property is explicitly set.
-        */
-        private boolean isSplitable = false;
-        
-        private String recordIdentifier;
-        
-        public XMLFileInputFormat(String recordIdentifier) {
-            this.recordIdentifier = recordIdentifier;
-        }
-        
-        @SuppressWarnings("unchecked")
-        @Override
-        public RecordReader createRecordReader(InputSplit split,
-                TaskAttemptContext context) throws IOException,
-                InterruptedException {
-            
-            return new XMLFileRecordReader(recordIdentifier);
-        }  
-        
-        @Override
-        protected boolean isSplitable(JobContext context, Path filename) {
-           CompressionCodec codec = 
-              new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
-           return (!(codec == null)) ? isSplitable : true;
-        }
-    }
-    
-    //------------------------------------------------------------------------
-    // Implementation of RecordReader
-    
-    public static class XMLFileRecordReader extends RecordReader {
-
-        private long start;
-        private long end;
-        private String recordIdentifier;
-        
-        /*
-         * xmlloader input stream which has the ability to split the input
-         * dataset into records by the specified tag
-         */
-        private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null;
-
-        public XMLFileRecordReader(String recordIdentifier) {
-            this.recordIdentifier = recordIdentifier;
-        }
-        
-        @Override
-        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
-                throws IOException, InterruptedException {
-            FileSplit split = (FileSplit) genericSplit;
-            Configuration job = context.getConfiguration();
- 
-            start = split.getStart();
-            end = start + split.getLength();
-            final Path file = split.getPath();
- 
-            // open the file and seek to the start of the split
-            FileSystem fs = file.getFileSystem(job);
-            FSDataInputStream fileIn = fs.open(split.getPath());
-            
-            // Seek to the start of the file
-            fileIn.seek(start);
-        
-            if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
-            {
-            	// For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
-               CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
-               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
-            }
-            else if (file.toString().endsWith(".gz"))
-            {
-            	CompressionCodecFactory compressionCodecs =  new CompressionCodecFactory(job);
-            	final CompressionCodec codec = compressionCodecs.getCodec(file);
-            	 if (codec != null) {
-            	    end = Long.MAX_VALUE;
-            	      CompressionInputStream stream = codec.createInputStream(fileIn);
-            	      this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
-            	    }
-            }
-            
-            else
-            {
-               this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
-            }
-        }
-
-        
-        @Override
-        public void close() throws IOException {
-            xmlLoaderBPIS.close();
-        }
-
-        @Override
-        public Object getCurrentKey() throws IOException, InterruptedException {
-            return null;
-        }
-
-        @Override
-        public Object getCurrentValue() throws IOException,
-                InterruptedException {            
-            return xmlLoaderBPIS.collectTag(recordIdentifier, end);
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
- 
-            return 0;
-        }
-
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-            return xmlLoaderBPIS.isReadable();
-        }
-        
+    public String toString() {
+      return wrapped.toString();
     }
+  }
+
+  /**Location of the file loaded*/
+  private String loadLocation;
+
+  /**Underlying record reader*/
+  @SuppressWarnings("rawtypes")
+  protected RecordReader in = null;
+
+  /**XML tag to parse*/
+  private String identifier;
+
+  public XMLLoader(String identifier) {
+    this.identifier = identifier;
+  }
+
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit split)
+      throws IOException {
+    in = reader;
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    try {
+      if (!in.nextKeyValue())
+	return null;
+      Tuple tuple = createTuple(in.getCurrentValue().toString());
+      return tuple;
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      return null;
+    }
+  }
+
+
+  /**
+   * Creates a tuple from a matched string
+   */
+  public Tuple createTuple(String str) {
+    return TupleFactory.getInstance().newTuple(new DataByteArray(str));
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+    if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+      return new Bzip2TextInputFormat() {
+	@Override
+	public RecordReader<LongWritable, Text> createRecordReader(
+	    InputSplit split, TaskAttemptContext context) {
+	  try {
+	    RecordReader<LongWritable, Text> originalReader =
+		super.createRecordReader(split, context);
+	    XMLRecordReader reader = new XMLRecordReader(originalReader);
+	    reader.setXMLIdentifier(identifier);
+	    return reader;
+	  } catch (IOException e) {
+	    throw new RuntimeException("Cannot create input split", e);
+	  } catch (InterruptedException e) {
+	    throw new RuntimeException("Cannot create input split", e);
+	  }
+	}
+      };
+    } else {
+      return new PigTextInputFormat() {
+	@Override
+	public RecordReader<LongWritable, Text> createRecordReader(
+	    InputSplit split, TaskAttemptContext context) {
+	  RecordReader<LongWritable, Text> originalReader =
+	      super.createRecordReader(split, context);
+	  XMLRecordReader reader = new XMLRecordReader(originalReader);
+	  reader.setXMLIdentifier(identifier);
+	  return reader;
+	}
+      };
+    }
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    loadLocation = location;
+    FileInputFormat.setInputPaths(job, location);
+  }
 }

Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Apr 29 17:36:49 2014
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -27,9 +28,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +57,7 @@ import org.apache.pig.StoreFunc;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -64,9 +69,13 @@ import org.json.simple.parser.ParseExcep
  */
 public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
 
+    private static final Log LOG = LogFactory.getLog(AvroStorage.class);
     /* storeFunc parameters */
     private static final String NOTNULL = "NOTNULL";
     private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+    private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
+    private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
+    private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
     private static final String SCHEMA_DELIM = "#";
     private static final String SCHEMA_KEYVALUE_DELIM = "@";
     private static final String NO_SCHEMA_CHECK = "no_schema_check";
@@ -145,12 +154,38 @@ public class AvroStorage extends FileInp
     /**
      * Set input location and obtain input schema.
      */
+    @SuppressWarnings("unchecked")
     @Override
     public void setLocation(String location, Job job) throws IOException {
         if (inputAvroSchema != null) {
             return;
         }
 
+        if (!UDFContext.getUDFContext().isFrontend()) {
+            Properties udfProps = getUDFProperties();
+            String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
+            if (mergedSchema != null) {
+                HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
+                        (HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
+                schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
+                for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
+                    schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
+                }
+            }
+            String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
+            if (schema != null) {
+                try {
+                    inputAvroSchema = new Schema.Parser().parse(schema);
+                    return;
+                } catch (Exception e) {
+                    // Cases like testMultipleSchemas2 cause exception while deserializing
+                    // symbols. In that case, we get it again.
+                    LOG.warn("Exception while trying to deserialize schema in backend. " +
+                            "Will construct again. schema= " + schema, e);
+                }
+            }
+        }
+
         Configuration conf = job.getConfiguration();
         Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
         if (!paths.isEmpty()) {
@@ -158,10 +193,19 @@ public class AvroStorage extends FileInp
             // bloat configuration size
             FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
             // Scan all directories including sub directories for schema
-            setInputAvroSchema(paths, conf);
+            if (inputAvroSchema == null) {
+                setInputAvroSchema(paths, conf);
+            }
         } else {
             throw new IOException("Input path \'" + location + "\' is not found");
         }
+
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature) {
+        this.contextSignature = signature;
+        super.setUDFContextSignature(signature);
     }
 
     /**
@@ -284,7 +328,8 @@ public class AvroStorage extends FileInp
             }
         }
         // schemaToMergedSchemaMap is only needed when merging multiple records.
-        if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+        if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
+                mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
             schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
         }
         return result;
@@ -398,6 +443,19 @@ public class AvroStorage extends FileInp
             if (pigSchema.getFields().length == 1){
                 pigSchema = pigSchema.getFields()[0].getSchema();
             }
+            Properties udfProps = getUDFProperties();
+            udfProps.put(AVRO_INPUT_SCHEMA_PROPERTY, inputAvroSchema.toString());
+            udfProps.put(AVRO_INPUT_PIG_SCHEMA_PROPERTY, pigSchema);
+            if (schemaToMergedSchemaMap != null) {
+                HashMap<URI, Map<Integer, Integer>> mergedSchemaMap = new HashMap<URI, Map<Integer, Integer>>();
+                for (Entry<Path, Map<Integer, Integer>> entry : schemaToMergedSchemaMap.entrySet()) {
+                    //Path is not serializable
+                    mergedSchemaMap.put(entry.getKey().toUri(), entry.getValue());
+                }
+                udfProps.put(AVRO_MERGED_SCHEMA_PROPERTY,
+                        ObjectSerializer.serialize(mergedSchemaMap));
+            }
+
             return pigSchema;
         } else {
             return null;
@@ -731,6 +789,7 @@ public class AvroStorage extends FileInp
     @Override
     public void setStoreFuncUDFContextSignature(String signature) {
         this.contextSignature = signature;
+        super.setUDFContextSignature(signature);
     }
 
     @Override

Modified: pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Tue Apr 29 17:36:49 2014
@@ -17,21 +17,29 @@ import static org.apache.pig.ExecType.LO
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Random;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.test.Util;
+import org.w3c.dom.Document;
 
 public class TestXMLLoader extends TestCase {
-  private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
   public static ArrayList<String[]> data = new ArrayList<String[]>();
   static {
     data.add(new String[] { "<configuration>"});
@@ -89,13 +97,11 @@ public class TestXMLLoader extends TestC
     inlineClosedTags.add(new String[] { "<event id='33'><tag k='a' v='b'/></event>"});
     inlineClosedTags.add(new String[] { "</events>"});
   }
-  
   public void testShouldReturn0TupleCountIfSearchTagIsNotFound () throws Exception
   {
     String filename = TestHelper.createTempFile(data, "");
     PigServer pig = new PigServer(LOCAL);
     filename = filename.replace("\\", "\\\\");
-    patternString = patternString.replace("\\", "\\\\");
     String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('invalid') as (doc:chararray);";
     pig.registerQuery(query);
     Iterator<?> it = pig.openIterator("A");
@@ -118,7 +124,6 @@ public class TestXMLLoader extends TestC
     String filename = TestHelper.createTempFile(data, "");
     PigServer pig = new PigServer(LOCAL);
     filename = filename.replace("\\", "\\\\");
-    patternString = patternString.replace("\\", "\\\\");
     String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
     pig.registerQuery(query);
     Iterator<?> it = pig.openIterator("A");
@@ -175,7 +180,6 @@ public class TestXMLLoader extends TestC
     }
 	    
    }
-
    public void testLoaderShouldLoadBasicGzFile() throws Exception {
     String filename = TestHelper.createTempFile(data, "");
 	  
@@ -224,7 +228,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -248,7 +251,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(data, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -265,14 +267,13 @@ public class TestXMLLoader extends TestC
       }
       assertEquals(1, tupleCount);  
    }
-   
    public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
    {
       // modify the data content to avoid end tag for </ignoreProperty>
       ArrayList<String[]> testData = new ArrayList<String[]>();
       for (String content[] : data) {
          
-         if(false == data.equals(new String[] { "</ignoreProperty>"}))
+         if(!content[0].equals("</ignoreProperty>"))
          {
             testData.add(content);
          }
@@ -281,8 +282,7 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
-      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
       int tupleCount = 0;
@@ -307,8 +307,7 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(testData, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
-      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
       int tupleCount = 0;
@@ -330,7 +329,6 @@ public class TestXMLLoader extends TestC
       String filename = TestHelper.createTempFile(nestedTags, "");
       PigServer pig = new PigServer(LOCAL);
       filename = filename.replace("\\", "\\\\");
-      patternString = patternString.replace("\\", "\\\\");
       String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
       pig.registerQuery(query);
       Iterator<?> it = pig.openIterator("A");
@@ -353,7 +351,6 @@ public class TestXMLLoader extends TestC
      String filename = TestHelper.createTempFile(inlineClosedTags, "");
      PigServer pig = new PigServer(LOCAL);
      filename = filename.replace("\\", "\\\\");
-     patternString = patternString.replace("\\", "\\\\");
      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
      pig.registerQuery(query);
      Iterator<?> it = pig.openIterator("A");
@@ -375,7 +372,6 @@ public class TestXMLLoader extends TestC
      String filename = TestHelper.createTempFile(inlineClosedTags, "");
      PigServer pig = new PigServer(LOCAL);
      filename = filename.replace("\\", "\\\\");
-     patternString = patternString.replace("\\", "\\\\");
      String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
      pig.registerQuery(query);
      Iterator<?> it = pig.openIterator("A");
@@ -391,4 +387,108 @@ public class TestXMLLoader extends TestC
        }
      }
    }
+
+   /**
+    * This test case test the special case when a non-matching tag spans two file
+    * splits in a .bz2 compressed file. At the same time, the part that falls in
+    * the first split is a prefix of the matching tag.
+    * In other words, till the end of the first split, it looks like the tag is
+    * matching but it is not actually matching.
+    * 
+    * @throws Exception
+    */
+   public void testXMLLoaderShouldNotReturnLastNonMatchedTag() throws Exception {
+     Configuration conf = new Configuration();
+     long blockSize = 100 * 1024;
+     conf.setLong("fs.local.block.size", blockSize);
+
+     String tagName = "event";
+
+     PigServer pig = new PigServer(LOCAL, conf);
+     FileSystem localFs = FileSystem.getLocal(conf);
+     FileStatus[] testFiles = localFs.globStatus(new Path("src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/*xml.bz2"));
+     assertTrue("No test files", testFiles.length > 0);
+     for (FileStatus testFile : testFiles) {
+       String testFileName = testFile.getPath().toUri().getPath().replace("\\", "\\\\");
+       String query = "A = LOAD '" + testFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+       pig.registerQuery(query);
+       Iterator<?> it = pig.openIterator("A");
+       while (it.hasNext()) {
+         Tuple tuple = (Tuple) it.next();
+         if (tuple == null)
+           break;
+         else {
+           if (tuple.size() > 0) {
+             assertTrue(((String)tuple.get(0)).startsWith("<"+tagName+">"));
+           }
+         }
+       }
+     }
+   }
+
+   /**
+    * This test checks that a multi-line tag spanning two splits should be
+    * matched.
+    * @throws Exception
+    */
+   public void testXMLLoaderShouldMatchTagSpanningSplits() throws Exception {
+     Configuration conf = new Configuration();
+     long blockSize = 512;
+     conf.setLong("fs.local.block.size", blockSize);
+     conf.setLong("mapred.max.split.size", blockSize);
+
+     String tagName = "event";
+     File tempFile = File.createTempFile("long-file", ".xml");
+     FileSystem localFs = FileSystem.getLocal(conf);
+     FSDataOutputStream directOut = localFs.create(new Path(tempFile.getAbsolutePath()), true);
+
+     String matchingElement = "<event>\ndata\n</event>\n";
+     long pos = 0;
+     int matchingCount = 0;
+     PrintStream ps = new PrintStream(directOut);
+     // 1- Write some elements that fit completely in the first block
+     while (pos + 2 * matchingElement.length() < blockSize) {
+       ps.print(matchingElement);
+       pos += matchingElement.length();
+       matchingCount++;
+     }
+     // 2- Write a long element that spans multiple lines and multiple blocks
+     String longElement = matchingElement.replace("data",
+         "data\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\n");
+     ps.print(longElement);
+     pos += longElement.length();
+     matchingCount++;
+     // 3- Write some more elements to fill in the second block completely
+     while (pos < 2 * blockSize) {
+       ps.print(matchingElement);
+       pos += matchingElement.length();
+       matchingCount++;
+     }
+     ps.close();
+     
+     
+     PigServer pig = new PigServer(LOCAL, conf);
+     String tempFileName = tempFile.getAbsolutePath().replace("\\", "\\\\");
+     String query = "A = LOAD '" + tempFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+     pig.registerQuery(query);
+     Iterator<?> it = pig.openIterator("A");
+     
+     int count = 0;
+     while (it.hasNext()) {
+       Tuple tuple = (Tuple) it.next();
+       if (tuple == null)
+         break;
+       else {
+         if (tuple.size() > 0) {
+           count++;
+           // Make sure the returned text is a proper XML element
+           DocumentBuilder docBuilder =
+               DocumentBuilderFactory.newInstance().newDocumentBuilder();
+           Document doc = docBuilder.parse(new ByteArrayInputStream(((String)tuple.get(0)).getBytes()));
+           assertTrue(doc.getDocumentElement().getNodeName().equals(tagName));
+         }
+       }
+     }
+     assertEquals(matchingCount, count);
+   }
 }

Modified: pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Apr 29 17:36:49 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
     public static void unsetConf(Configuration conf, String key) {
         // Not supported in Hadoop 0.20/1.x
     }
+    
+    /**
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * @param conf
+     * @param taskAttemptID
+     */
+    public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+        conf.set("mapred.task.id", taskAttemptID.toString());
+    }
+    
 }

Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Apr 29 17:36:49 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
     public static void unsetConf(Configuration conf, String key) {
         conf.unset(key);
     }
+    
+    /**
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * @param conf
+     * @param taskAttemptID
+     */
+    public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+        conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
+    }
+    
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Tue Apr 29 17:36:49 2014
@@ -542,7 +542,6 @@ public abstract class AbstractAccumuloSt
                 org.apache.accumulo.trace.instrument.Tracer.class,
                 org.apache.accumulo.core.client.Instance.class,
                 org.apache.accumulo.fate.Fate.class,
-                org.apache.accumulo.server.tabletserver.TabletServer.class,
                 org.apache.zookeeper.ZooKeeper.class,
                 org.apache.thrift.TServiceClient.class);
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Tue Apr 29 17:36:49 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -34,6 +35,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
 import org.apache.pig.data.SchemaTupleBackend;
@@ -71,17 +74,22 @@ public class FetchLauncher {
      * @throws IOException
      */
     public PigStats launchPig(PhysicalPlan pp) throws IOException {
-        POStore poStore = (POStore) pp.getLeaves().get(0);
-        init(pp, poStore);
-
-        // run fetch
-        runPipeline(poStore);
+        try {
+            POStore poStore = (POStore) pp.getLeaves().get(0);
+            init(pp, poStore);
+
+            // run fetch
+            runPipeline(poStore);
+
+            UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+                    new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+            udfFinisher.visit();
 
-        UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
-                new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
-        udfFinisher.visit();
-
-        return PigStats.start(new EmptyPigStats(pigContext, poStore));
+            return PigStats.start(new EmptyPigStats(pigContext, poStore));
+        }
+        finally {
+            UDFContext.getUDFContext().addJobConf(null);
+        }
     }
 
     /**
@@ -112,10 +120,18 @@ public class FetchLauncher {
     private void init(PhysicalPlan pp, POStore poStore) throws IOException {
         poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
         poStore.setUp();
+
+        TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
+        HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+
         if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
             MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
         }
 
+        String currentTime = Long.toString(System.currentTimeMillis());
+        conf.set("pig.script.submitted.timestamp", currentTime);
+        conf.set("pig.job.submitted.timestamp", currentTime);
+
         PhysicalOperator.setReporter(new FetchProgressableReporter());
         SchemaTupleBackend.initialize(conf, pigContext);
 
@@ -125,13 +141,13 @@ public class FetchLauncher {
         udfContext.serialize(conf);
 
         PigMapReduce.sJobConfInternal.set(conf);
-        String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+        String dtzStr = conf.get("pig.datetime.default.tz");
         if (dtzStr != null && dtzStr.length() > 0) {
             // ensure that the internal timezone is uniformly in UTC offset style
             DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
         }
 
-        boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+        boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
         pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
         PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Tue Apr 29 17:36:49 2014
@@ -28,7 +28,7 @@ import org.apache.pig.classification.Int
 @InterfaceStability.Evolving
 public interface PigLogger {
     /**
-     * If you have warning messages that need aggregation
+     * If you have warning messages that need aggregation 
      */
     public void warn(Object o, String msg, Enum<?> warningEnum);
 }

Modified: pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java Tue Apr 29 17:36:49 2014
@@ -71,7 +71,7 @@ public class AddDuration extends EvalFun
 
     @Override
     public DateTime exec(Tuple input) throws IOException {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
         

Modified: pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class DaysBetween extends EvalFun
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java Tue Apr 29 17:36:49 2014
@@ -77,7 +77,7 @@ public class HoursBetween extends EvalFu
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java Tue Apr 29 17:36:49 2014
@@ -77,7 +77,7 @@ public class MilliSecondsBetween extends
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class MinutesBetween extends Eval
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class MonthsBetween extends EvalF
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class SecondsBetween extends Eval
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java Tue Apr 29 17:36:49 2014
@@ -71,7 +71,7 @@ public class SubtractDuration extends Ev
 
     @Override
     public DateTime exec(Tuple input) throws IOException {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
         

Modified: pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class WeeksBetween extends EvalFu
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class YearsBetween extends EvalFu
     @Override
     public Long exec(Tuple input) throws IOException
     {
-        if (input == null || input.size() < 2) {
+        if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
             return null;
         }
 

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Tue Apr 29 17:36:49 2014
@@ -280,7 +280,10 @@ public class ProjectExpression extends C
             return fieldSchema;
         LogicalRelationalOperator referent = findReferent();
 
-        LogicalSchema schema = referent.getSchema();
+        LogicalSchema schema = null;
+        if (referent.getSchema()!=null) {
+            schema = referent.getSchema().deepCopy();
+        }
 
         if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
             if (!(findReferent() instanceof LOInnerLoad)||

Modified: pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Tue Apr 29 17:36:49 2014
@@ -261,7 +261,7 @@ TOKEN_MGR_DECLS : {
 	<"'"> {prevState = PIG_START;} : IN_STRING
 |	<"`"> {prevState = PIG_START;} : IN_COMMAND
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+ > {prevState = PIG_START;} : GENERATE
 |       <"{"> {pigBlockLevel = 1;} : IN_BLOCK
 |       <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
 |       <";"> : PIG_END
@@ -364,7 +364,7 @@ TOKEN_MGR_DECLS : {
 {
 	<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
 |	<"{"> {pigBlockLevel++;}
 |       <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
 |	<"'"> {prevState = IN_BLOCK;} : IN_STRING

Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1586886-1591006

Modified: pig/branches/tez/test/org/apache/pig/TestMain.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/TestMain.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/TestMain.java (original)
+++ pig/branches/tez/test/org/apache/pig/TestMain.java Tue Apr 29 17:36:49 2014
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.BufferedWriter;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileWriter;
+import java.io.FileReader;
 import java.io.IOException;
 import java.util.Properties;
 
@@ -35,6 +37,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.parser.ParserException;
 import org.apache.pig.parser.SourceLocation;
 import org.apache.pig.test.TestPigRunner.TestNotificationListener;
+import org.apache.pig.test.Util;
 import org.apache.pig.tools.parameters.ParameterSubstitutionException;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
@@ -126,6 +129,35 @@ public class TestMain {
         }
     }
 
+    @Test
+    public void testParseInputScript() throws Exception {
+        File input = Util.createInputFile("tmp", "",
+                new String[]{"{(1,1.0)}\ttestinputstring1",
+                        "{(2,2.0)}\ttestinputstring1",
+                        "{(3,3.0)}\ttestinputstring1",
+                        "{(4,4.0)}\ttestinputstring1"}
+        );
+        File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut");
+        File scriptFile = Util.createInputFile("pigScript", "",
+                new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);",
+                        "B = foreach A generate\n" +
+                                "    b,\n" +
+                                "    (bag{tuple(long)}) a.x as ax:{(x:long)};",
+                        "store B into '"+out.getAbsolutePath()+"';"}
+        );
+
+        Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null);
+        BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000")));
+        String line;
+        int count = 0;
+        while(( line = file.readLine()) != null) {
+            count++;
+        }
+        assertEquals(4,count);
+        Util.deleteDirectory(new File(out.getAbsolutePath()));
+        assertTrue(!new File(out.getAbsolutePath()).exists());
+    }
+
     public static class TestNotificationListener2 extends TestNotificationListener {
         protected boolean hadArgs = false;
         public TestNotificationListener2() {}

Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java Tue Apr 29 17:36:49 2014
@@ -1216,4 +1216,19 @@ public class TestEvalPipelineLocal {
         Schema s = pigServer.dumpSchema("final");
         Assert.assertEquals(s.toString(), "{n1::x3: int}");
     }
+    
+    // see PIG-3909
+    @Test
+    public void testCastSchemaShare() throws Exception{
+        File f1 = createFile(new String[]{"{([fieldkey1#polisan,fieldkey2#lily])}"});
+        
+        pigServer.registerQuery("A = load '" + Util.encodeEscape(Util.generateURI(f1.toString(), pigServer.getPigContext()))
+                + "' as (bagofmap:{});");
+        pigServer.registerQuery("B = foreach A generate FLATTEN((IsEmpty(bagofmap) ? null : bagofmap)) AS bagofmap;");
+        pigServer.registerQuery("C = filter B by (chararray)bagofmap#'fieldkey1' matches 'po.*';");
+        pigServer.registerQuery("D = foreach C generate (chararray)bagofmap#'fieldkey2';");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        Assert.assertEquals(iter.next().toString(), "(lily)");
+    }
 }

Modified: pig/branches/tez/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestFetch.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestFetch.java Tue Apr 29 17:36:49 2014
@@ -18,14 +18,18 @@
 
 package org.apache.pig.test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.PrintStream;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
@@ -39,17 +43,21 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.parser.ParserTestingUtils;
 import org.apache.pig.test.utils.GenPhyOp;
+import org.joda.time.DateTime;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+
 public class TestFetch {
 
     private PigServer pigServer;
@@ -87,6 +95,8 @@ public class TestFetch {
     @Before
     public void setUp() throws Exception{
         pigServer = new PigServer(ExecType.LOCAL, new Properties());
+        // force direct fetch mode
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH, "true");
     }
 
     @Test
@@ -279,6 +289,26 @@ public class TestFetch {
 
     }
 
+    /**
+     * Tests whether 'pig.job.submitted.timestamp' has been set by FetchLauncher
+     * @throws Exception
+     */
+    @Test
+    public void test7() throws Exception {
+        Data data = resetData(pigServer);
+
+        List<Tuple> justSomeRows = Lists.newArrayListWithCapacity(1);
+        justSomeRows.add(tuple(1));
+        data.set("justSomeRows", justSomeRows);
+
+        pigServer.registerQuery("A = load 'justSomeRows' using mock.Storage();");
+        pigServer.registerQuery("B = foreach A generate CurrentTime();");
+        Iterator<Tuple> it = pigServer.openIterator("B");
+        DateTime received = (DateTime) it.next().get(0);
+        // any returned result indicates that the property was set correctly
+        assertNotNull(received);
+    }
+
     @AfterClass
     public static void tearDownOnce() throws Exception {
         inputFile1.delete();