You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/29 19:36:50 UTC
svn commit: r1591026 [1/2] - in /pig/branches/tez: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apach...
Author: cheolsoo
Date: Tue Apr 29 17:36:49 2014
New Revision: 1591026
URL: http://svn.apache.org/r1591026
Log:
Merge latest trunk changes
Added:
pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
- copied from r1591006, pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/
pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
- copied unchanged from r1591006, pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/test-file-2.xml.bz2
Modified:
pig/branches/tez/ (props changed)
pig/branches/tez/CHANGES.txt
pig/branches/tez/bin/pig.cmd
pig/branches/tez/build.xml
pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java
pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java
pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java
pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java
pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java
pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java
pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java
pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java
pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java
pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java
pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
pig/branches/tez/src/pig-default.properties (props changed)
pig/branches/tez/test/org/apache/pig/TestMain.java
pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
pig/branches/tez/test/org/apache/pig/test/TestFetch.java
pig/branches/tez/test/org/apache/pig/test/TestMRJobStats.java
pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
pig/branches/tez/test/perf/pigmix/bin/runpigmix.pl
Propchange: pig/branches/tez/
------------------------------------------------------------------------------
Merged /pig/trunk:r1586886-1591006
Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Tue Apr 29 17:36:49 2014
@@ -22,6 +22,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
+PIG-3898: Refactor PPNL for non-MR execution engine (cheolsoo)
+
PIG-3485: Remove CastUtils.bytesToMap(byte[] b) method from LoadCaster interface (cheolsoo)
PIG-3419: Pluggable Execution Engine (achalsoni81 via cheolsoo)
@@ -30,6 +32,16 @@ PIG-2207: Support custom counters for ag
IMPROVEMENTS
+PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
+
+PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)
+
+PIG-3737: Bundle dependent jars in distribution in %PIG_HOME%/lib folder (daijy)
+
+PIG-3771: Piggybank Avrostorage makes a lot of namenode calls in the backend (rohini)
+
+PIG-3851: Upgrade jline to 2.11 (daijy)
+
PIG-3884: Move multi store counters to PigStatsUtil from MRPigStatsUtil (rohini)
PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
@@ -105,6 +117,24 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3909: Type Casting issue (daijy)
+
+PIG-3905: 0.12.1 release can't be build for Hadoop2 (daijy)
+
+PIG-3894: Datetime function AddDuration, SubtractDuration and all Between functions don't check for null values in the input tuple (jennythompson via cheolsoo)
+
+PIG-3772: Syntax error when casting an inner schema of a bag and line break involved (ssvinarchukhorton via daijy)
+
+PIG-3889: Direct fetch doesn't set job submission timestamps (cheolsoo)
+
+PIG-3895: Pigmix run script has compilation error (rohini)
+
+PIG-3885: AccumuloStorage incompatible with Accumulo 1.6.0 (elserj via daijy)
+
+PIG-3888: Direct fetch doesn't differentiate between frontend and backend sides (lbendig via daijy)
+
+PIG-3887: TestMRJobStats is broken in trunk (cheolsoo)
+
PIG-3868: Fix Iterator_1 e2e test on windows (ssvinarchukhorton via rohini)
PIG-3871: Replace org.python.google.* with com.google.* in imports (cheolsoo)
Modified: pig/branches/tez/bin/pig.cmd
URL: http://svn.apache.org/viewvc/pig/branches/tez/bin/pig.cmd?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/bin/pig.cmd (original)
+++ pig/branches/tez/bin/pig.cmd Tue Apr 29 17:36:49 2014
@@ -93,6 +93,9 @@ set PIGARGS=
for %%i in (%PIG_HOME%\*.jar) do (
set CLASSPATH=!CLASSPATH!;%%i
)
+ for %%i in (%PIG_HOME%\lib\*.jar) do (
+ set CLASSPATH=!CLASSPATH!;%%i
+ )
if not defined PIG_CONF_DIR (
set PIG_CONF_DIR=%PIG_HOME%\conf
)
Modified: pig/branches/tez/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/tez/build.xml?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/build.xml (original)
+++ pig/branches/tez/build.xml Tue Apr 29 17:36:49 2014
@@ -986,14 +986,24 @@
</copy>
<copy todir="${tar.dist.dir}/lib">
- <fileset dir="${ivy.lib.dir}" includes="jython*.jar"/>
- <fileset dir="${ivy.lib.dir}" includes="jruby*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="jython-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="jruby-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="groovy-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="js-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="hbase-*.jar" excludes="hbase-*tests.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="protobuf-java-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="zookeeper-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="accumulo-*.jar" excludes="accumulo-minicluster*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="avro-*.jar" excludes="avro-*tests.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="json-simple-*.jar"/>
</copy>
<copy file="${output.jarfile.backcompat.withouthadoop}" tofile="${tar.dist.dir}/${final.name}-withouthadoop.jar" />
<copy file="${output.jarfile.backcompat}" tofile="${tar.dist.dir}/${final.name}.jar" />
+ <copy todir="${tar.dist.dir}/lib" file="contrib/piggybank/java/piggybank.jar"/>
+
<copy todir="${tar.dist.dir}/" file="ivy.xml" />
<copy todir="${tar.dist.dir}/ivy">
@@ -1558,7 +1568,7 @@
<target name="ivy-resolve" depends="ivy-init" unless="ivy.resolved" description="Resolve Ivy dependencies">
<property name="ivy.resolved" value="true"/>
- <ivy:resolve settingsRef="${ant.project.name}.ivy.settings"/>
+ <ivy:resolve settingsRef="${ant.project.name}.ivy.settings" conf="compile"/>
</target>
<target name="ivy-compile" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for compile configuration">
Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/XMLLoader.java Tue Apr 29 17:36:49 2014
@@ -18,693 +18,303 @@
package org.apache.pig.piggybank.storage;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
+import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.tools.bzip2r.CBZip2InputStream;
-
-
-
/**
- * A <code>XMLLoaderBufferedPositionedInputStream</code> is the package class and is the
- * decorator over the BufferedPositionedInputStream which in turn decorate
- * BufferedInputStream. It contains <code>BufferedPositionedInputStream<code>
- * input stream, which it uses as
- * its basic source of data, possibly reading or providing additional
- * functionality. The class <code>XMLLoaderBufferedPositionedInputStream</code>
- * itself simply overrides the necessary medthod for reading i.e
- * <code>read</code> <code>getPosition<code> with versions that
- * pass all requests to the contained input
- * stream or do some special processing. Subclasses of <code>XMLLoaderBufferedPositionedInputStream</code>
- * may further override some of these methods
- * and may also provide additional methods
- * and fields.
- * It also provides additional method <code>collectTag<collect> which will give the byte
- * array between the tag which is a xml record. i.e <tag> .*</tag> will be returned
+ * Parses an XML input file given a specified identifier of tags to be loaded.
+ * The output is a bag of XML elements where each element is returned as
+ * a chararray containing the text of the matched XML element including the
+ * start and tags as well as the data between them. In case of nesting elements
+ * of the matching tags, only the top level one is returned.
*
- * @note we can't use the standard SAX or STAX parser as for a big xml
- * the intermittent hadoop block may not be the valid xml and hence those
- * parser may create pb.
- *
- * @since pig 2.0
*/
+public class XMLLoader extends LoadFunc {
-class XMLLoaderBufferedPositionedInputStream extends BufferedPositionedInputStream {
-
- public final static int S_START = 0;
- public final static int S_MATCH_PREFIX = 1;
- public final static int S_MATCH_TAG = 2;
+ /**
+ * Use this record reader to read XML tags out of a text file. It matches only
+ * the tags identified by an identifier configured through a call to
+ * {@link #setXMLIdentifier(String)}. It there are nesting tags of the given
+ * identifier, only the top level one is returned which also includes all
+ * enclosed tags.
+ */
+ public static class XMLRecordReader extends RecordReader<LongWritable, Text> {
+ protected final RecordReader<LongWritable, Text> wrapped;
+ /**Regular expression for XML tag identifier*/
+ private static final String XMLTagNameRegExp = "[a-zA-Z\\_][0-9a-zA-Z\\-_]+";
/**
- * The input streamed to be filtered
+ * A regular expression that matches key parts in the XML text needed to
+ * correctly parse it and find matches of the given identifier
*/
- InputStream wrapperIn;
+ private Pattern identifiersPattern;
- /**
- * The field to know if the underlying buffer contains any more bytes
- */
- boolean _isReadable;
+ private LongWritable key;
+ private Text value;
- /**
- * The field set the maximum bytes that is readable by this instance of stream.
- */
- private long maxBytesReadable = 0;
-
- /**
- * The field denote the number of bytes read by this stream.
- */
- long bytesRead = 0;
-
- /**
- * Denotes the end of the current split location
- */
- long end = 0;
-
- /**
- * Creates a <code>XMLLoaderBufferedPositionedInputStream</code>
- * by assigning the argument <code>in</code>
- * to the field <code>this.wrapperIn</code> so as
- * to remember it for later use.
- *
- * @param in the underlying input stream,
- */
- public XMLLoaderBufferedPositionedInputStream(InputStream in){
- super(in);
- this.wrapperIn = in;
- setReadable(true);
- }
-
- /**
- * Creates a split aware <code>XMLLoaderBufferedPositionedInputStream</code>.
- * @param in the underlying input stream
- * @param start start location of the split
- * @param end end location of the split
- */
- public XMLLoaderBufferedPositionedInputStream(InputStream in,long start,long end){
- this(in);
- this.end = end;
- maxBytesReadable = end - start;
- }
+ /**Position of the current buffer in the file*/
+ private long bufferPos;
- /**
- * Set the stream readable or non readable. This is needed
- * to control the xml parsing.
- * @param flag The boolean flag to be set
- * @see XMLLoaderBufferedPositionedInputStream#isReadable
- */
- private void setReadable(boolean flag) {
- _isReadable = flag;
- }
+ /**Holds parts of the input file that were read but not parsed yet*/
+ private String buffer;
- /**
- * See if the stream readable or non readable. This is needed
- * to control the xml parsing.
- * @return true if readable otherwise false
- * @see XMLLoaderBufferedPositionedInputStream#setReadable
- */
- public boolean isReadable() {
- return _isReadable == true;
- }
+ /**Original end of the split to parse*/
+ private long originalEnd;
- /**
- * org.apache.pig.impl.io.BufferedPositionedInputStream.read
- * It is just the wrapper for now.
- * Reads the next byte of data from this input stream. The value
- * byte is returned as an <code>int</code> in the range
- * <code>0</code> to <code>255</code>. If no byte is available
- * because the end of the stream has been reached, the value
- * <code>-1</code> is returned. This method blocks until input data
- * is available, the end of the stream is detected, or an exception
- * is thrown.
- * <p>
- * This method
- * simply performs <code>in.read()</code> and returns the result.
- *
- * @return the next byte of data, or <code>-1</code> if the end of the
- * stream is reached.
- * @exception IOException if an I/O error occurs.
- * @see XMLLoaderBufferedPositionedInputStreamInputStream#wrapperIn
- */
- public int read() throws IOException {
- return wrapperIn.read();
- }
-
- /**
- * This is collect the bytes from current position to the ending tag.
- * This scans for the tags and do the pattern match byte by byte
- * this must be used along with
- * XMLLoaderBufferedPositionedInputStream#skipToTag
- *
- * @param tagName the end tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing the documents until the end of tag
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
- */
- private byte[] collectUntilEndTag(String tagName, long limit) {
+ private boolean terminated;
- //@todo use the charset and get the charset encoding from the xml encoding.
- byte[] tmp = tagName.getBytes();
- ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
- // Levels of elements we went inside matched node
- int depth = 0;
-
- //Since skipToTag was called before this function, we know that we are
- //currently inside the matched tag. Assuming the XML file is well
- //structured, we read till we encounter the first close tag. Since
- //the matched element might contain nested element, we keep track of the
- //current depth and terminate only when we encounter a closing tag at
- //level zero
-
- // A flag to indicate the parsing is currently inside a (start/end) tag
- boolean insideTag = false;
- // A flag to indicate that the current tag is a closing (end) tag
- boolean closingTag = false;
-
- // Last byte read
- int last_b = -1;
- while (true) {
- int b = -1;
- try {
- b = this.read();
- ++bytesRead; // Add one to the bytes read
- if (b == -1) {
- collectBuf.reset();
- this.setReadable(false);
- break;
- }
- collectBuf.write((byte)(b));
-
- // Check if the start tag has matched except for the last char
- if (b == '<') {
- insideTag = true;
- closingTag = false;
- } else if (b == '>') {
- // Detect the pattern />
- if (last_b == '/')
- closingTag = true;
- insideTag = false;
- if (closingTag) {
- if (depth == 0)
- break;
- depth--;
- }
- } else if (b == '/' && last_b == '<') {
- // Detected the pattern </
- closingTag = true;
- } else if (insideTag && last_b == '<') {
- // First character after '<' which is not a '/'
- depth++;
- }
- }
- catch (IOException e) {
- this.setReadable(false);
- return null;
- }
- last_b = b;
- }
- return collectBuf.toByteArray();
+ public XMLRecordReader(RecordReader<LongWritable, Text> wrapped) {
+ this.wrapped = wrapped;
}
/**
- * This is collect the from the matching tag.
- * This scans for the tags and do the pattern match byte by byte
- * This returns a part doc. it must be used along with
- * XMLLoaderBufferedPositionedInputStream#collectUntilEndTag
- *
- * @param tagName the start tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing match of the tag.
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
+ * Delegate the initialization method to the wrapped stream after changing
+ * the length of the split to be non-ending.
*/
- private byte[] skipToTag(String tagName, long limit) throws IOException {
-
- //@todo use the charset and get the charset encoding from the xml encoding.
- byte[] tmp = tagName.getBytes();
- byte[] tag = new byte[tmp.length + 1];
- tag[0] = (byte)'<';
- for (int i = 0; i < tmp.length; ++i) {
- tag[1+i] = tmp[i];
- }
-
- ByteArrayOutputStream matchBuf = new ByteArrayOutputStream(512);
- int idxTagChar = 0;
- int state = S_START;
-
- /*
- * Read till the tag is found in this block. If a partial tag block is found
- * then continue on to the next block.matchBuf contains the data that is currently
- * matched. If the read has reached the end of split and there are matched data
- * then continue on to the next block.
- */
- while (splitBoundaryCriteria(wrapperIn) || (matchBuf.size() > 0 )) {
- int b = -1;
- try {
- b = this.read();
- ++bytesRead; // Increment the bytes read by 1
- if (b == -1) {
- state = S_START;
- matchBuf.reset();
- this.setReadable(false);
- break;
- }
- switch (state) {
- case S_START:
- // start to match the target open tag
- if (b == tag[idxTagChar]) {
- ++idxTagChar;
- matchBuf.write((byte)(b));
- if (idxTagChar == tag.length) {
- state = S_MATCH_PREFIX;
- }
- } else { // mismatch
- idxTagChar = 0;
- matchBuf.reset();
- }
- break;
- case S_MATCH_PREFIX:
- // tag match iff next character is whitespaces or close tag mark
- if (Character.isWhitespace(b) || b == '/' || b == '>') {
- matchBuf.write((byte)(b));
- state = S_MATCH_TAG;
- } else {
- idxTagChar = 0;
- matchBuf.reset();
- state = S_START;
- }
- break;
- case S_MATCH_TAG:
- // keep copy characters until we hit the close tag mark
- matchBuf.write((byte)(b));
- break;
- default:
- throw new IllegalArgumentException("Invalid state: " + state);
- }
- if (state == S_MATCH_TAG && (b == '>' || Character.isWhitespace(b))) {
- break;
- }
- if (state != S_MATCH_TAG && this.getPosition() > limit) {
- // need to break, no record in this block
- break;
- }
- }
- catch (IOException e) {
- this.setReadable(false);
- return null;
- }
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ key = new LongWritable();
+ value = new Text();
+ if (split instanceof FileSplit) {
+ FileSplit fsplit = (FileSplit) split;
+ originalEnd = fsplit.getStart() + fsplit.getLength();
+ Path path = fsplit.getPath();
+ long fileEnd = path.getFileSystem(context.getConfiguration()).getFileStatus(path).getLen();
+ FileSplit extendedSplit = new FileSplit(path, fsplit.getStart(),
+ Math.min(fsplit.getLength() * 10, fileEnd - fsplit.getStart()), fsplit.getLocations());
+ this.wrapped.initialize(extendedSplit, context);
+ } else {
+ throw new RuntimeException("Cannot override a split of type'"+
+ split.getClass()+"'");
}
- return matchBuf.toByteArray();
- }
- /**
- * Returns whether the split boundary condition has reached or not.
- * For normal files ; the condition is to read till the split end reaches.
- * Gz files will have maxBytesReadable set to near Long.MAXVALUE, hence
- * this will cause the entire file to be read. For bz2 and bz files, the
- * condition lies on the position which until which it is read.
- *
- * @param wrapperIn2
- * @return true/false depending on whether split boundary has reached or no
- * @throws IOException
- */
- private boolean splitBoundaryCriteria(InputStream wrapperIn2) throws IOException {
- if(wrapperIn2 instanceof CBZip2InputStream)
- return ((CBZip2InputStream)wrapperIn2).getPos() <= end;
- else
- return bytesRead <= maxBytesReadable;
}
- /**
- * This is collect bytes from start and end tag both inclusive
- * This scans for the tags and do the pattern match byte by byte
- *
- * @param tagName the start tag to search for
- *
- * @param limit the end pointer for the block for this mapper
- *
- * @return the byte array containing match of the <code><tag>.*</tag><code>.
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.skipToTag
- *
- * @see loader.XMLLoaderBufferedPositionedInputStream.collectUntilEndTag
- *
- */
- byte[] collectTag(String tagName, long limit) throws IOException {
- ByteArrayOutputStream collectBuf = new ByteArrayOutputStream(1024);
- byte[] beginTag = skipToTag(tagName, limit);
-
- // Check if the tag is closed inline
- if (beginTag.length > 2 && beginTag[beginTag.length - 2] == '/' &&
- beginTag[beginTag.length-1] == '>') {
- return beginTag;
- }
-
- // No need to search for the end tag if the start tag is not found
- if(beginTag.length > 0 ){
- byte[] untilTag = collectUntilEndTag(tagName, limit);
- if (untilTag.length > 0) {
- for (byte b: beginTag) {
- collectBuf.write(b);
- }
- for (byte b: untilTag) {
- collectBuf.write(b);
- }
- }
- }
- return collectBuf.toByteArray();
+ public void setXMLIdentifier(String identifier) {
+ if (!identifier.matches(XMLTagNameRegExp))
+ throw new RuntimeException("XML tag identifier '"+identifier+"' does not match the regular expression /"+XMLTagNameRegExp+"/");
+ String inlineClosedTagRegExp = "<\\s*"+identifier+"\\s*[^>]*/>";
+ String openTagRegExp = "<\\s*"+identifier+"(?:\\s*|\\s+(?:[^/>]*|[^>]*[^>/]))>";
+ String closeTagRegExp = "</\\s*"+identifier+"\\s*>";
+ identifiersPattern = Pattern.compile("("+inlineClosedTagRegExp+")|("+openTagRegExp+")|("+closeTagRegExp+")");
}
-}
-
-
-/**
- * The load function to load the XML file
- * This implements the LoadFunc interface which is used to parse records
- * from a dataset. The various helper adaptor function is extended from loader.Utf8StorageConverter
- * which included various functions to cast raw byte data into various datatypes.
- * other sections of the code can call back to the loader to do the cast.
- * This takes a xmlTag as the arg which it will use to split the inputdataset into
- * multiple records.
- * <code>
- *
- * For example if the input xml (input.xml) is like this
- * <configuration>
- * <property>
- * <name> foobar </name>
- * <value> barfoo </value>
- * </property>
- * <ignoreProperty>
- * <name> foo </name>
- * </ignoreProperty>
- * <property>
- * <name> justname </name>
- * </property>
- * </configuration>
- *
- * And your pig script is like this
- *
- * --load the jar files
- * register /homes/aloks/pig/udfLib/loader.jar;
- * -- load the dataset using XMLLoader
- * -- A is the bag containing the tuple which contains one atom i.e doc see output
- * A = load '/user/aloks/pig/input.xml using loader.XMLLoader('property') as (doc:chararray);
- * --dump the result
- * dump A;
- *
- *
- * Then you will get the output
- *
- * (<property>
- * <name> foobar </name>
- * <value> barfoo </value>
- * </property>)
- * (<property>
- * <name> justname </name>
- * </property>)
- *
- *
- * Where each () indicate one record
- *
- *
- * </code>
- */
-
-public class XMLLoader extends LoadFunc {
-
- /**
- * logger from pig
- */
- protected final Log mLog = LogFactory.getLog(getClass());
-
- private XMLFileRecordReader reader = null;
-
-
- /**
- * the tuple content which is used while returning
- */
- private ArrayList<Object> mProtoTuple = null;
-
- /**
- * The record seperated. The default value is 'document'
- */
- public String recordIdentifier = "document";
-
- private String loadLocation;
-
- public XMLLoader() {
-
+ /* Delegate all methods to the wrapped stream */
+ public void close() throws IOException {
+ wrapped.close();
}
- /**
- * Constructs a Pig loader that uses specified string as the record seperater
- * for example if the recordIdentifier is document. It will consider the record as
- * <document> .* </document>
- *
- * @param recordIdentifier the xml tag which is used to pull records
- *
- */
- public XMLLoader(String recordIdentifier) {
- this();
- this.recordIdentifier = recordIdentifier;
+ public boolean equals(Object obj) {
+ return wrapped.equals(obj);
}
- /**
- * Retrieves the next tuple to be processed.
- * @return the next tuple to be processed or null if there are no more tuples
- * to be processed.
- * @throws IOException
- */
- @Override
- public Tuple getNext() throws IOException {
-
- boolean next = false;
-
- try {
- next = reader.nextKeyValue();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
-
- if (!next) return null;
-
- Tuple t = null;
-
- try {
- byte[] tagContent = (byte[]) reader.getCurrentValue();
- // No need to create the tuple if there are no contents
- t = (tagContent.length > 0) ? createTuple(tagContent) : null;
- } catch (Exception e) {
- throw new IOException(e);
- }
-
- return t;
-
- }
-
- public Tuple createTuple(byte[] content) throws Exception {
- if (mProtoTuple == null) {
- mProtoTuple = new ArrayList<Object>();
- }
- if (content.length > 0) {
- mProtoTuple.add(new DataByteArray(content));
- }
- Tuple t = TupleFactory.getInstance().newTupleNoCopy(mProtoTuple);
- mProtoTuple = null;
-
- return t;
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
}
- /**
- * to check for equality
- * @param object
- */
- public boolean equals(Object obj) {
- return equals((XMLLoader)obj);
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return value;
}
- /**
- * to check for equality
- * @param XMLLoader object
- */
- public boolean equals(XMLLoader other) {
- return this.recordIdentifier.equals(other.recordIdentifier);
+ public float getProgress() throws IOException, InterruptedException {
+ return Math.max(1.0f, this.wrapped.getProgress() * 10);
}
- @SuppressWarnings("unchecked")
- @Override
- public InputFormat getInputFormat() throws IOException {
- XMLFileInputFormat inputFormat = new XMLFileInputFormat(recordIdentifier);
- if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
- inputFormat.isSplitable = true;
- }
- return inputFormat;
+ public int hashCode() {
+ return wrapped.hashCode();
}
- @SuppressWarnings("unchecked")
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split)
- throws IOException {
- this.reader = (XMLFileRecordReader) reader;
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (this.terminated)
+ return false;
+ int depth = 0;
+ // In case of an tag matched with an open tag and a closed tag, this buffer
+ // is used to accumulate matched element if it is spans multiple lines.
+ StringBuffer currentMatch = new StringBuffer();
+ // The start offset of first matched open tag. This marks the first byte
+ // in the range to be copied to output.
+ int offsetOfFirstMatchedOpenTag = 0;
+ try {
+ while (true) {
+ while (buffer == null || buffer.length() == 0) {
+ if (!wrapped.nextKeyValue())
+ return false; // End of split
+ // if passed the end offset of current split, terminate the matching
+ if (bufferPos >= originalEnd && depth == 0) {
+ this.terminated = true;
+ return false;
+ }
+
+ bufferPos = wrapped.getCurrentKey().get();
+ buffer = wrapped.getCurrentValue().toString();
+ }
+ Matcher matcher = identifiersPattern.matcher(buffer);
+ while (matcher.find()) {
+ int startOfCurrentMatch = matcher.start();
+ int endOfCurrentMatch = matcher.end();
+ String group;
+ if ((group = matcher.group(1)) != null) {
+ // Matched an inline-closed tag
+ value = new Text(group);
+ this.key.set(bufferPos + matcher.start(1));
+ bufferPos += matcher.end(1);
+ buffer = buffer.substring(endOfCurrentMatch);
+ return true;
+ } else if ((group = matcher.group(2)) != null) {
+ // Matched an open tag
+ // If this is a top-level match (i.e., not enclosed in another matched
+ // tag), all bytes starting from this offset will be copied to output
+ // in one of two cases:
+ // 1- When a matching close tag is found
+ // 2- When an end of line is encountered
+ if (depth == 0) {
+ offsetOfFirstMatchedOpenTag = startOfCurrentMatch;
+ this.key.set(bufferPos + startOfCurrentMatch);
+ }
+ depth++;
+ } else if ((group = matcher.group(3)) != null) {
+ // Matched a closed tag
+ if (depth > 0) {
+ depth--;
+ if (depth == 0) {
+ // A full top-level match
+ // Copy all bytes to output
+ if (currentMatch.length() == 0) {
+ // A full match in one line, return it immediately
+ value = new Text(buffer.substring(offsetOfFirstMatchedOpenTag, endOfCurrentMatch));
+ } else {
+ currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, endOfCurrentMatch);
+ value = new Text(currentMatch.toString());
+ }
+ // Copy remaining non matched part to the buffer for next call
+ buffer = buffer.substring(endOfCurrentMatch);
+ bufferPos += endOfCurrentMatch;
+ return true;
+ }
+ }
+ } else {
+ throw new RuntimeException("Invalid match '"+matcher.group()+"' in string '"+buffer+"'");
+ }
+ }
+ // No more matches in current line. If we are inside a match (i.e.,
+ // an open tag has been matched) copy all parts to the match.
+ // Otherwise, just drop it.
+ if (depth > 0) {
+ // Inside a match
+ currentMatch.append(buffer, offsetOfFirstMatchedOpenTag, buffer.length());
+ }
+ buffer = null;
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Error getting input");
+ }
+
}
- @Override
- public void setLocation(String location, Job job) throws IOException {
- loadLocation = location;
- FileInputFormat.setInputPaths(job, location);
- }
-
- //------------------------------------------------------------------------
- // Implementation of InputFormat
-
- public static class XMLFileInputFormat extends FileInputFormat {
-
- /**
- * Boolean flag used to identify whether splittable property is explicitly set.
- */
- private boolean isSplitable = false;
-
- private String recordIdentifier;
-
- public XMLFileInputFormat(String recordIdentifier) {
- this.recordIdentifier = recordIdentifier;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public RecordReader createRecordReader(InputSplit split,
- TaskAttemptContext context) throws IOException,
- InterruptedException {
-
- return new XMLFileRecordReader(recordIdentifier);
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- CompressionCodec codec =
- new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
- return (!(codec == null)) ? isSplitable : true;
- }
- }
-
- //------------------------------------------------------------------------
- // Implementation of RecordReader
-
- public static class XMLFileRecordReader extends RecordReader {
-
- private long start;
- private long end;
- private String recordIdentifier;
-
- /*
- * xmlloader input stream which has the ability to split the input
- * dataset into records by the specified tag
- */
- private XMLLoaderBufferedPositionedInputStream xmlLoaderBPIS = null;
-
- public XMLFileRecordReader(String recordIdentifier) {
- this.recordIdentifier = recordIdentifier;
- }
-
- @Override
- public void initialize(InputSplit genericSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit split = (FileSplit) genericSplit;
- Configuration job = context.getConfiguration();
-
- start = split.getStart();
- end = start + split.getLength();
- final Path file = split.getPath();
-
- // open the file and seek to the start of the split
- FileSystem fs = file.getFileSystem(job);
- FSDataInputStream fileIn = fs.open(split.getPath());
-
- // Seek to the start of the file
- fileIn.seek(start);
-
- if(file.toString().endsWith(".bz2") || file.toString().endsWith(".bz"))
- {
- // For bzip2 files use CBZip2InputStream to read and supply the upper input stream.
- CBZip2InputStream in = new CBZip2InputStream(fileIn,9, end);
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(in,start,end);
- }
- else if (file.toString().endsWith(".gz"))
- {
- CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(job);
- final CompressionCodec codec = compressionCodecs.getCodec(file);
- if (codec != null) {
- end = Long.MAX_VALUE;
- CompressionInputStream stream = codec.createInputStream(fileIn);
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(stream,start,end);
- }
- }
-
- else
- {
- this.xmlLoaderBPIS = new XMLLoaderBufferedPositionedInputStream(fileIn,start,end);
- }
- }
-
-
- @Override
- public void close() throws IOException {
- xmlLoaderBPIS.close();
- }
-
- @Override
- public Object getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- @Override
- public Object getCurrentValue() throws IOException,
- InterruptedException {
- return xmlLoaderBPIS.collectTag(recordIdentifier, end);
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
-
- return 0;
- }
-
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- return xmlLoaderBPIS.isReadable();
- }
-
+ public String toString() {
+ return wrapped.toString();
}
+ }
+
+ /**Location of the file loaded*/
+ private String loadLocation;
+
+ /**Underlying record reader*/
+ @SuppressWarnings("rawtypes")
+ protected RecordReader in = null;
+
+ /**XML tag to parse*/
+ private String identifier;
+
+ public XMLLoader(String identifier) {
+ this.identifier = identifier;
+ }
+
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split)
+ throws IOException {
+ in = reader;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ try {
+ if (!in.nextKeyValue())
+ return null;
+ Tuple tuple = createTuple(in.getCurrentValue().toString());
+ return tuple;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+
+ /**
+ * Creates a tuple from a matched string
+ */
+ public Tuple createTuple(String str) {
+ return TupleFactory.getInstance().newTuple(new DataByteArray(str));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public InputFormat getInputFormat() throws IOException {
+ if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
+ return new Bzip2TextInputFormat() {
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ try {
+ RecordReader<LongWritable, Text> originalReader =
+ super.createRecordReader(split, context);
+ XMLRecordReader reader = new XMLRecordReader(originalReader);
+ reader.setXMLIdentifier(identifier);
+ return reader;
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create input split", e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Cannot create input split", e);
+ }
+ }
+ };
+ } else {
+ return new PigTextInputFormat() {
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(
+ InputSplit split, TaskAttemptContext context) {
+ RecordReader<LongWritable, Text> originalReader =
+ super.createRecordReader(split, context);
+ XMLRecordReader reader = new XMLRecordReader(originalReader);
+ reader.setXMLIdentifier(identifier);
+ return reader;
+ }
+ };
+ }
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ loadLocation = location;
+ FileInputFormat.setInputPaths(job, location);
+ }
}
Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Tue Apr 29 17:36:49 2014
@@ -19,6 +19,7 @@ package org.apache.pig.piggybank.storage
import java.io.IOException;
import java.io.InputStream;
+import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -27,9 +28,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -53,6 +57,7 @@ import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -64,9 +69,13 @@ import org.json.simple.parser.ParseExcep
*/
public class AvroStorage extends FileInputLoadFunc implements StoreFuncInterface, LoadMetadata {
+ private static final Log LOG = LogFactory.getLog(AvroStorage.class);
/* storeFunc parameters */
private static final String NOTNULL = "NOTNULL";
private static final String AVRO_OUTPUT_SCHEMA_PROPERTY = "avro_output_schema";
+ private static final String AVRO_INPUT_SCHEMA_PROPERTY = "avro_input_schema";
+ private static final String AVRO_INPUT_PIG_SCHEMA_PROPERTY = "avro_input_pig_schema";
+ private static final String AVRO_MERGED_SCHEMA_PROPERTY = "avro_merged_schema_map";
private static final String SCHEMA_DELIM = "#";
private static final String SCHEMA_KEYVALUE_DELIM = "@";
private static final String NO_SCHEMA_CHECK = "no_schema_check";
@@ -145,12 +154,38 @@ public class AvroStorage extends FileInp
/**
* Set input location and obtain input schema.
*/
+ @SuppressWarnings("unchecked")
@Override
public void setLocation(String location, Job job) throws IOException {
if (inputAvroSchema != null) {
return;
}
+ if (!UDFContext.getUDFContext().isFrontend()) {
+ Properties udfProps = getUDFProperties();
+ String mergedSchema = udfProps.getProperty(AVRO_MERGED_SCHEMA_PROPERTY);
+ if (mergedSchema != null) {
+ HashMap<URI, Map<Integer, Integer>> mergedSchemaMap =
+ (HashMap<URI, Map<Integer, Integer>>) ObjectSerializer.deserialize(mergedSchema);
+ schemaToMergedSchemaMap = new HashMap<Path, Map<Integer, Integer>>();
+ for (Entry<URI, Map<Integer, Integer>> entry : mergedSchemaMap.entrySet()) {
+ schemaToMergedSchemaMap.put(new Path(entry.getKey()), entry.getValue());
+ }
+ }
+ String schema = udfProps.getProperty(AVRO_INPUT_SCHEMA_PROPERTY);
+ if (schema != null) {
+ try {
+ inputAvroSchema = new Schema.Parser().parse(schema);
+ return;
+ } catch (Exception e) {
+ // Cases like testMultipleSchemas2 cause exception while deserializing
+ // symbols. In that case, we get it again.
+ LOG.warn("Exception while trying to deserialize schema in backend. " +
+ "Will construct again. schema= " + schema, e);
+ }
+ }
+ }
+
Configuration conf = job.getConfiguration();
Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
if (!paths.isEmpty()) {
@@ -158,10 +193,19 @@ public class AvroStorage extends FileInp
// bloat configuration size
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
// Scan all directories including sub directories for schema
- setInputAvroSchema(paths, conf);
+ if (inputAvroSchema == null) {
+ setInputAvroSchema(paths, conf);
+ }
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
+
+ }
+
+ @Override
+ public void setUDFContextSignature(String signature) {
+ this.contextSignature = signature;
+ super.setUDFContextSignature(signature);
}
/**
@@ -284,7 +328,8 @@ public class AvroStorage extends FileInp
}
}
// schemaToMergedSchemaMap is only needed when merging multiple records.
- if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
+ if ((schemaToMergedSchemaMap == null || schemaToMergedSchemaMap.isEmpty()) &&
+ mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
schemaToMergedSchemaMap = AvroStorageUtils.getSchemaToMergedSchemaMap(result, mergedFiles);
}
return result;
@@ -398,6 +443,19 @@ public class AvroStorage extends FileInp
if (pigSchema.getFields().length == 1){
pigSchema = pigSchema.getFields()[0].getSchema();
}
+ Properties udfProps = getUDFProperties();
+ udfProps.put(AVRO_INPUT_SCHEMA_PROPERTY, inputAvroSchema.toString());
+ udfProps.put(AVRO_INPUT_PIG_SCHEMA_PROPERTY, pigSchema);
+ if (schemaToMergedSchemaMap != null) {
+ HashMap<URI, Map<Integer, Integer>> mergedSchemaMap = new HashMap<URI, Map<Integer, Integer>>();
+ for (Entry<Path, Map<Integer, Integer>> entry : schemaToMergedSchemaMap.entrySet()) {
+ //Path is not serializable
+ mergedSchemaMap.put(entry.getKey().toUri(), entry.getValue());
+ }
+ udfProps.put(AVRO_MERGED_SCHEMA_PROPERTY,
+ ObjectSerializer.serialize(mergedSchemaMap));
+ }
+
return pigSchema;
} else {
return null;
@@ -731,6 +789,7 @@ public class AvroStorage extends FileInp
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
+ super.setUDFContextSignature(signature);
}
@Override
Modified: pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestXMLLoader.java Tue Apr 29 17:36:49 2014
@@ -17,21 +17,29 @@ import static org.apache.pig.ExecType.LO
import java.io.ByteArrayInputStream;
import java.io.File;
+import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Random;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import junit.framework.TestCase;
+import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.Util;
+import org.w3c.dom.Document;
public class TestXMLLoader extends TestCase {
- private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
public static ArrayList<String[]> data = new ArrayList<String[]>();
static {
data.add(new String[] { "<configuration>"});
@@ -89,13 +97,11 @@ public class TestXMLLoader extends TestC
inlineClosedTags.add(new String[] { "<event id='33'><tag k='a' v='b'/></event>"});
inlineClosedTags.add(new String[] { "</events>"});
}
-
public void testShouldReturn0TupleCountIfSearchTagIsNotFound () throws Exception
{
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('invalid') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -118,7 +124,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('property') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -175,7 +180,6 @@ public class TestXMLLoader extends TestC
}
}
-
public void testLoaderShouldLoadBasicGzFile() throws Exception {
String filename = TestHelper.createTempFile(data, "");
@@ -224,7 +228,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('name') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -248,7 +251,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(data, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -265,14 +267,13 @@ public class TestXMLLoader extends TestC
}
assertEquals(1, tupleCount);
}
-
public void testShouldReturn0TupleCountIfNoEndTagIsFound() throws Exception
{
// modify the data content to avoid end tag for </ignoreProperty>
ArrayList<String[]> testData = new ArrayList<String[]>();
for (String content[] : data) {
- if(false == data.equals(new String[] { "</ignoreProperty>"}))
+ if(!content[0].equals("</ignoreProperty>"))
{
testData.add(content);
}
@@ -281,8 +282,7 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
- String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
int tupleCount = 0;
@@ -307,8 +307,7 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(testData, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
- String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('</ignoreProperty>') as (doc:chararray);";
+ String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('ignoreProperty') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
int tupleCount = 0;
@@ -330,7 +329,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(nestedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -353,7 +351,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(inlineClosedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -375,7 +372,6 @@ public class TestXMLLoader extends TestC
String filename = TestHelper.createTempFile(inlineClosedTags, "");
PigServer pig = new PigServer(LOCAL);
filename = filename.replace("\\", "\\\\");
- patternString = patternString.replace("\\", "\\\\");
String query = "A = LOAD '" + filename + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
pig.registerQuery(query);
Iterator<?> it = pig.openIterator("A");
@@ -391,4 +387,108 @@ public class TestXMLLoader extends TestC
}
}
}
+
+ /**
+ * This test case test the special case when a non-matching tag spans two file
+ * splits in a .bz2 compressed file. At the same time, the part that falls in
+ * the first split is a prefix of the matching tag.
+ * In other words, till the end of the first split, it looks like the tag is
+ * matching but it is not actually matching.
+ *
+ * @throws Exception
+ */
+ public void testXMLLoaderShouldNotReturnLastNonMatchedTag() throws Exception {
+ Configuration conf = new Configuration();
+ long blockSize = 100 * 1024;
+ conf.setLong("fs.local.block.size", blockSize);
+
+ String tagName = "event";
+
+ PigServer pig = new PigServer(LOCAL, conf);
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FileStatus[] testFiles = localFs.globStatus(new Path("src/test/java/org/apache/pig/piggybank/test/evaluation/xml/data/*xml.bz2"));
+ assertTrue("No test files", testFiles.length > 0);
+ for (FileStatus testFile : testFiles) {
+ String testFileName = testFile.getPath().toUri().getPath().replace("\\", "\\\\");
+ String query = "A = LOAD '" + testFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ assertTrue(((String)tuple.get(0)).startsWith("<"+tagName+">"));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This test checks that a multi-line tag spanning two splits should be
+ * matched.
+ * @throws Exception
+ */
+ public void testXMLLoaderShouldMatchTagSpanningSplits() throws Exception {
+ Configuration conf = new Configuration();
+ long blockSize = 512;
+ conf.setLong("fs.local.block.size", blockSize);
+ conf.setLong("mapred.max.split.size", blockSize);
+
+ String tagName = "event";
+ File tempFile = File.createTempFile("long-file", ".xml");
+ FileSystem localFs = FileSystem.getLocal(conf);
+ FSDataOutputStream directOut = localFs.create(new Path(tempFile.getAbsolutePath()), true);
+
+ String matchingElement = "<event>\ndata\n</event>\n";
+ long pos = 0;
+ int matchingCount = 0;
+ PrintStream ps = new PrintStream(directOut);
+ // 1- Write some elements that fit completely in the first block
+ while (pos + 2 * matchingElement.length() < blockSize) {
+ ps.print(matchingElement);
+ pos += matchingElement.length();
+ matchingCount++;
+ }
+ // 2- Write a long element that spans multiple lines and multiple blocks
+ String longElement = matchingElement.replace("data",
+ "data\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\ndata\n");
+ ps.print(longElement);
+ pos += longElement.length();
+ matchingCount++;
+ // 3- Write some more elements to fill in the second block completely
+ while (pos < 2 * blockSize) {
+ ps.print(matchingElement);
+ pos += matchingElement.length();
+ matchingCount++;
+ }
+ ps.close();
+
+
+ PigServer pig = new PigServer(LOCAL, conf);
+ String tempFileName = tempFile.getAbsolutePath().replace("\\", "\\\\");
+ String query = "A = LOAD '" + tempFileName + "' USING org.apache.pig.piggybank.storage.XMLLoader('event') as (doc:chararray);";
+ pig.registerQuery(query);
+ Iterator<?> it = pig.openIterator("A");
+
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple tuple = (Tuple) it.next();
+ if (tuple == null)
+ break;
+ else {
+ if (tuple.size() > 0) {
+ count++;
+ // Make sure the returned text is a proper XML element
+ DocumentBuilder docBuilder =
+ DocumentBuilderFactory.newInstance().newDocumentBuilder();
+ Document doc = docBuilder.parse(new ByteArrayInputStream(((String)tuple.get(0)).getBytes()));
+ assertTrue(doc.getDocumentElement().getNodeName().equals(tagName));
+ }
+ }
+ }
+ assertEquals(matchingCount, count);
+ }
}
Modified: pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Apr 29 17:36:49 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
// Not supported in Hadoop 0.20/1.x
}
+
+ /**
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * @param conf
+ * @param taskAttemptID
+ */
+ public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+ conf.set("mapred.task.id", taskAttemptID.toString());
+ }
+
}
Modified: pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/branches/tez/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Apr 29 17:36:49 2014
@@ -119,4 +119,14 @@ public class HadoopShims {
public static void unsetConf(Configuration conf, String key) {
conf.unset(key);
}
+
+ /**
+ * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
+ * @param conf
+ * @param taskAttemptID
+ */
+ public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
+ conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/accumulo/AbstractAccumuloStorage.java Tue Apr 29 17:36:49 2014
@@ -542,7 +542,6 @@ public abstract class AbstractAccumuloSt
org.apache.accumulo.trace.instrument.Tracer.class,
org.apache.accumulo.core.client.Instance.class,
org.apache.accumulo.fate.Fate.class,
- org.apache.accumulo.server.tabletserver.TabletServer.class,
org.apache.zookeeper.ZooKeeper.class,
org.apache.thrift.TServiceClient.class);
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchLauncher.java Tue Apr 29 17:36:49 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintStream;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -34,6 +35,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.backend.hadoop.executionengine.shims.TaskContext;
import org.apache.pig.data.SchemaTupleBackend;
@@ -71,17 +74,22 @@ public class FetchLauncher {
* @throws IOException
*/
public PigStats launchPig(PhysicalPlan pp) throws IOException {
- POStore poStore = (POStore) pp.getLeaves().get(0);
- init(pp, poStore);
-
- // run fetch
- runPipeline(poStore);
+ try {
+ POStore poStore = (POStore) pp.getLeaves().get(0);
+ init(pp, poStore);
+
+ // run fetch
+ runPipeline(poStore);
+
+ UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
+ new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
+ udfFinisher.visit();
- UDFFinishVisitor udfFinisher = new UDFFinishVisitor(pp,
- new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(pp));
- udfFinisher.visit();
-
- return PigStats.start(new EmptyPigStats(pigContext, poStore));
+ return PigStats.start(new EmptyPigStats(pigContext, poStore));
+ }
+ finally {
+ UDFContext.getUDFContext().addJobConf(null);
+ }
}
/**
@@ -112,10 +120,18 @@ public class FetchLauncher {
private void init(PhysicalPlan pp, POStore poStore) throws IOException {
poStore.setStoreImpl(new FetchPOStoreImpl(pigContext));
poStore.setUp();
+
+ TaskAttemptID taskAttemptID = HadoopShims.getNewTaskAttemptID();
+ HadoopShims.setTaskAttemptId(conf, taskAttemptID);
+
if (!PlanHelper.getPhysicalOperators(pp, POStream.class).isEmpty()) {
MapRedUtil.setupStreamingDirsConfSingle(poStore, pigContext, conf);
}
+ String currentTime = Long.toString(System.currentTimeMillis());
+ conf.set("pig.script.submitted.timestamp", currentTime);
+ conf.set("pig.job.submitted.timestamp", currentTime);
+
PhysicalOperator.setReporter(new FetchProgressableReporter());
SchemaTupleBackend.initialize(conf, pigContext);
@@ -125,13 +141,13 @@ public class FetchLauncher {
udfContext.serialize(conf);
PigMapReduce.sJobConfInternal.set(conf);
- String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
+ String dtzStr = conf.get("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
// ensure that the internal timezone is uniformly in UTC offset style
DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
}
- boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+ boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
pigStatusReporter.setContext(new TaskContext<FetchContext>(new FetchContext()));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PigLogger.java Tue Apr 29 17:36:49 2014
@@ -28,7 +28,7 @@ import org.apache.pig.classification.Int
@InterfaceStability.Evolving
public interface PigLogger {
/**
- * If you have warning messages that need aggregation
+ * If you have warning messages that need aggregation
*/
public void warn(Object o, String msg, Enum<?> warningEnum);
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/AddDuration.java Tue Apr 29 17:36:49 2014
@@ -71,7 +71,7 @@ public class AddDuration extends EvalFun
@Override
public DateTime exec(Tuple input) throws IOException {
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/DaysBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class DaysBetween extends EvalFun
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/HoursBetween.java Tue Apr 29 17:36:49 2014
@@ -77,7 +77,7 @@ public class HoursBetween extends EvalFu
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MilliSecondsBetween.java Tue Apr 29 17:36:49 2014
@@ -77,7 +77,7 @@ public class MilliSecondsBetween extends
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MinutesBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class MinutesBetween extends Eval
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/MonthsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class MonthsBetween extends EvalF
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/SecondsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class SecondsBetween extends Eval
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/SubtractDuration.java Tue Apr 29 17:36:49 2014
@@ -71,7 +71,7 @@ public class SubtractDuration extends Ev
@Override
public DateTime exec(Tuple input) throws IOException {
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/WeeksBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class WeeksBetween extends EvalFu
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/YearsBetween.java Tue Apr 29 17:36:49 2014
@@ -78,7 +78,7 @@ public class YearsBetween extends EvalFu
@Override
public Long exec(Tuple input) throws IOException
{
- if (input == null || input.size() < 2) {
+ if (input == null || input.size() < 2 || input.get(0) == null || input.get(1) == null) {
return null;
}
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Tue Apr 29 17:36:49 2014
@@ -280,7 +280,10 @@ public class ProjectExpression extends C
return fieldSchema;
LogicalRelationalOperator referent = findReferent();
- LogicalSchema schema = referent.getSchema();
+ LogicalSchema schema = null;
+ if (referent.getSchema()!=null) {
+ schema = referent.getSchema().deepCopy();
+ }
if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
if (!(findReferent() instanceof LOInnerLoad)||
Modified: pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Tue Apr 29 17:36:49 2014
@@ -261,7 +261,7 @@ TOKEN_MGR_DECLS : {
<"'"> {prevState = PIG_START;} : IN_STRING
| <"`"> {prevState = PIG_START;} : IN_COMMAND
| <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
+| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+ > {prevState = PIG_START;} : GENERATE
| <"{"> {pigBlockLevel = 1;} : IN_BLOCK
| <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
| <";"> : PIG_END
@@ -364,7 +364,7 @@ TOKEN_MGR_DECLS : {
{
<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
| <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
+| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
| <"{"> {pigBlockLevel++;}
| <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
| <"'"> {prevState = IN_BLOCK;} : IN_STRING
Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1586886-1591006
Modified: pig/branches/tez/test/org/apache/pig/TestMain.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/TestMain.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/TestMain.java (original)
+++ pig/branches/tez/test/org/apache/pig/TestMain.java Tue Apr 29 17:36:49 2014
@@ -24,8 +24,10 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.BufferedWriter;
+import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
+import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
@@ -35,6 +37,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.parser.ParserException;
import org.apache.pig.parser.SourceLocation;
import org.apache.pig.test.TestPigRunner.TestNotificationListener;
+import org.apache.pig.test.Util;
import org.apache.pig.tools.parameters.ParameterSubstitutionException;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.Test;
@@ -126,6 +129,35 @@ public class TestMain {
}
}
+ @Test
+ public void testParseInputScript() throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[]{"{(1,1.0)}\ttestinputstring1",
+ "{(2,2.0)}\ttestinputstring1",
+ "{(3,3.0)}\ttestinputstring1",
+ "{(4,4.0)}\ttestinputstring1"}
+ );
+ File out = new File(System.getProperty("java.io.tmpdir")+"/testParseInputScriptOut");
+ File scriptFile = Util.createInputFile("pigScript", "",
+ new String[]{"A = load '"+input.getAbsolutePath()+"' as (a:{(x:chararray, y:float)}, b:chararray);",
+ "B = foreach A generate\n" +
+ " b,\n" +
+ " (bag{tuple(long)}) a.x as ax:{(x:long)};",
+ "store B into '"+out.getAbsolutePath()+"';"}
+ );
+
+ Main.run(new String[]{"-x", "local", scriptFile.getAbsolutePath()}, null);
+ BufferedReader file = new BufferedReader(new FileReader(new File(out.getAbsolutePath()+"/part-m-00000")));
+ String line;
+ int count = 0;
+ while(( line = file.readLine()) != null) {
+ count++;
+ }
+ assertEquals(4,count);
+ Util.deleteDirectory(new File(out.getAbsolutePath()));
+ assertTrue(!new File(out.getAbsolutePath()).exists());
+ }
+
public static class TestNotificationListener2 extends TestNotificationListener {
protected boolean hadArgs = false;
public TestNotificationListener2() {}
Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java Tue Apr 29 17:36:49 2014
@@ -1216,4 +1216,19 @@ public class TestEvalPipelineLocal {
Schema s = pigServer.dumpSchema("final");
Assert.assertEquals(s.toString(), "{n1::x3: int}");
}
+
+ // see PIG-3909
+ @Test
+ public void testCastSchemaShare() throws Exception{
+ File f1 = createFile(new String[]{"{([fieldkey1#polisan,fieldkey2#lily])}"});
+
+ pigServer.registerQuery("A = load '" + Util.encodeEscape(Util.generateURI(f1.toString(), pigServer.getPigContext()))
+ + "' as (bagofmap:{});");
+ pigServer.registerQuery("B = foreach A generate FLATTEN((IsEmpty(bagofmap) ? null : bagofmap)) AS bagofmap;");
+ pigServer.registerQuery("C = filter B by (chararray)bagofmap#'fieldkey1' matches 'po.*';");
+ pigServer.registerQuery("D = foreach C generate (chararray)bagofmap#'fieldkey2';");
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ Assert.assertEquals(iter.next().toString(), "(lily)");
+ }
}
Modified: pig/branches/tez/test/org/apache/pig/test/TestFetch.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestFetch.java?rev=1591026&r1=1591025&r2=1591026&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestFetch.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestFetch.java Tue Apr 29 17:36:49 2014
@@ -18,14 +18,18 @@
package org.apache.pig.test;
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
@@ -39,17 +43,21 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.parser.ParserTestingUtils;
import org.apache.pig.test.utils.GenPhyOp;
+import org.joda.time.DateTime;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
public class TestFetch {
private PigServer pigServer;
@@ -87,6 +95,8 @@ public class TestFetch {
@Before
public void setUp() throws Exception{
pigServer = new PigServer(ExecType.LOCAL, new Properties());
+ // force direct fetch mode
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.OPT_FETCH, "true");
}
@Test
@@ -279,6 +289,26 @@ public class TestFetch {
}
+ /**
+ * Tests whether 'pig.job.submitted.timestamp' has been set by FetchLauncher
+ * @throws Exception
+ */
+ @Test
+ public void test7() throws Exception {
+ Data data = resetData(pigServer);
+
+ List<Tuple> justSomeRows = Lists.newArrayListWithCapacity(1);
+ justSomeRows.add(tuple(1));
+ data.set("justSomeRows", justSomeRows);
+
+ pigServer.registerQuery("A = load 'justSomeRows' using mock.Storage();");
+ pigServer.registerQuery("B = foreach A generate CurrentTime();");
+ Iterator<Tuple> it = pigServer.openIterator("B");
+ DateTime received = (DateTime) it.next().get(0);
+ // any returned result indicates that the property was set correctly
+ assertNotNull(received);
+ }
+
@AfterClass
public static void tearDownOnce() throws Exception {
inputFile1.delete();