You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/12/20 19:50:44 UTC
svn commit: r1424633 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/builtin/PigStorage.java
test/org/apache/pig/test/TestPigStorage.java
test/org/apache/pig/test/Util.java test/org/apache/pig/test/data/output1.pig
Author: jcoveney
Date: Thu Dec 20 18:50:44 2012
New Revision: 1424633
URL: http://svn.apache.org/viewvc?rev=1424633&view=rev
Log:
PIG-3100: If a .pig_schema file is present, can get an index out of bounds error (jcoveney)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/builtin/PigStorage.java
pig/trunk/test/org/apache/pig/test/TestPigStorage.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/org/apache/pig/test/data/output1.pig
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1424633&r1=1424632&r2=1424633&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 20 18:50:44 2012
@@ -64,6 +64,8 @@ PIG-3013: BinInterSedes improve chararra
BUG FIXES
+PIG-3100: If a .pig_schema file is present, can get an index out of bounds error (jcoveney)
+
PIG-3096: Make PigUnit thread safe (cheolsoo)
PIG-3095: "which" is called many, many times for each Pig STREAM statement (nwhite via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1424633&r1=1424632&r2=1424633&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Thu Dec 20 18:50:44 2012
@@ -81,7 +81,7 @@ import org.apache.pig.parser.ParserExcep
* An optional second constructor argument is provided that allows one to customize
* advanced behaviors. A list of available options is below:
* <ul>
- * <li><code>-schema</code> Reads/Stores the schema of the relation using a
+ * <li><code>-schema</code> Reads/Stores the schema of the relation using a
* hidden JSON file.
* <li><code>-noschema</code> Ignores a stored schema during loading.
* <li><code>-tagFile</code> Appends input source file name to beginning of each tuple.
@@ -94,7 +94,7 @@ import org.apache.pig.parser.ParserExcep
* field names and types of the data without the need for a user to explicitly provide the schema in an
* <code>as</code> clause, unless <code>-noschema</code> is specified. No attempt to merge conflicting
* schemas is made during loading. The first schema encountered during a file system scan is used.
- * If the schema file is not present while '-schema' option is used during loading,
+ * If the schema file is not present while '-schema' option is used during loading,
* it results in an error.
* <p>
* In addition, using <code>-schema</code> drops a ".pig_headers" file in the output directory.
@@ -107,7 +107,7 @@ import org.apache.pig.parser.ParserExcep
* The first field (0th index) in each Tuple will contain input file name.
* If<code>-tagPath</code> is specified, PigStorage will prepend input split path to each Tuple/row.
* Usage: A = LOAD 'input' using PigStorage(',','-tagPath'); B = foreach A generate $0;
- * The first field (0th index) in each Tuple will contain input file path
+ * The first field (0th index) in each Tuple will contain input file path
* <p>
* Note that regardless of whether or not you store the schema, you <b>always</b> need to specify
* the correct delimiter to read your data. If you store reading delimiter "#" and then load using
@@ -147,7 +147,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
protected boolean[] mRequiredColumns = null;
private boolean mRequiredColumnsInitialized = false;
-
+
// Indicates whether the input file name/path should be read.
private boolean tagFile = false;
private static final String TAG_SOURCE_FILE = "tagFile";
@@ -292,18 +292,21 @@ LoadPushDown, LoadMetadata, StoreMetadat
// only contains required fields.
// We walk the requiredColumns array to find required fields,
// and cast those.
- for (int i = 0; i < fieldSchemas.length; i++) {
+ for (int i = 0; i < Math.min(fieldSchemas.length, tup.size()); i++) {
if (mRequiredColumns == null || (mRequiredColumns.length>i && mRequiredColumns[i])) {
Object val = null;
if(tup.get(tupleIdx) != null){
byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();
val = CastUtils.convertToType(caster, bytes,
fieldSchemas[i], fieldSchemas[i].getType());
+ tup.set(tupleIdx, val);
}
- tup.set(tupleIdx, val);
tupleIdx++;
}
}
+ for (int i = tup.size(); i < fieldSchemas.length; i++) {
+ tup.append(null);
+ }
}
return tup;
}
Modified: pig/trunk/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStorage.java?rev=1424633&r1=1424632&r2=1424633&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStorage.java Thu Dec 20 18:50:44 2012
@@ -19,9 +19,11 @@
package org.apache.pig.test;
import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.apache.pig.builtin.mock.Storage.tuple;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
@@ -35,6 +37,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
@@ -191,7 +194,7 @@ public class TestPigStorage {
assertFalse(it.hasNext());
}
-
+
@Test
public void testPigStorageNoSchema() throws Exception {
//if the schema file does not exist, and '-schema' option is used
@@ -403,15 +406,15 @@ public class TestPigStorage {
header = Util.readOutput(pig.getPigContext(), outPath);
Assert.assertArrayEquals("Headers are not the same.", new String[] {"foo\tbar"}, header);
}
-
+
private void putInputFile(String filename) throws IOException {
Util.createLocalInputFile(filename, new String[] {});
}
-
+
private void putSchemaFile(String schemaFilename, ResourceSchema testSchema) throws JsonGenerationException, JsonMappingException, IOException {
new ObjectMapper().writeValue(new File(schemaFilename), testSchema);
}
-
+
@Test
public void testPigStorageSchemaSearch() throws Exception {
String globtestdir = "build/test/tmpglobbingdata/";
@@ -427,28 +430,28 @@ public class TestPigStorage {
putInputFile(globtestdir+"a/b0/input");
pig.mkdirs(globtestdir+"b");
} catch (IOException e) {};
-
+
// if schema file is not found, schema is null
ResourceSchema schema = pigStorage.getSchema(globtestdir, new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(schema==null);
-
+
// if .pig_schema is in the input directory
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"a/a0", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
-
+
// .pig_schema in one of globStatus returned directory
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/.pig_schema").delete();
-
+
putSchemaFile(globtestdir+"b/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"b/.pig_schema").delete();
-
+
// if .pig_schema is deep in the globbing, it will not get used
putSchemaFile(globtestdir+"a/a0/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"*", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
@@ -458,18 +461,18 @@ public class TestPigStorage {
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
new File(globtestdir+"a/a0/.pig_schema").delete();
new File(globtestdir+"a/.pig_schema").delete();
-
+
pigStorage = new PigStorage("\t", "-schema");
putSchemaFile(globtestdir+"a/.pig_schema", testSchema);
schema = pigStorage.getSchema(globtestdir+"{a,b}", new Job(ConfigurationUtil.toConfiguration(pigContext.getProperties())));
Assert.assertTrue(ResourceSchema.equals(schema, testSchema));
}
-
+
/**
* This is for testing source tagging option on PigStorage. When a user
* specifies '-tagFile' as an option, PigStorage must prepend the input
* source path to the tuple and "INPUT_FILE_NAME" to schema.
- *
+ *
* @throws Exception
*/
@Test
@@ -492,7 +495,7 @@ public class TestPigStorage {
fileAliases,fileTypes);
Assert.assertTrue("schema with -tagFile preprends INPUT_FILE_NAME",
Schema.equals(newSchema, genSchema, true, false));
-
+
// Verify that loading a-out with '-tagPath' produces
// the original schema, and prepends 'INPUT_FILE_PATH' to
// original schema.
@@ -504,7 +507,7 @@ public class TestPigStorage {
Assert.assertTrue("schema with -tagPath preprends INPUT_FILE_PATH",
Schema.equals(newSchema, genSchema, true, false));
-
+
// Verify that explicitly requesting no schema works
pig.registerQuery("d = LOAD '" + datadir + "aout' using PigStorage('\\t', '-noschema');");
genSchema = pig.dumpSchema("d");
@@ -520,7 +523,7 @@ public class TestPigStorage {
Assert.assertTrue("explicit schema overrides metadata",
Schema.equals(newSchema, genSchema, true, false));
}
-
+
@Test
public void testPigStorageSourceTagValue() throws Exception {
final String storeFileName = "part-m-00000";
@@ -531,7 +534,7 @@ public class TestPigStorage {
pig.registerQuery(query);
// Storing in 'aout' directory will store contents in part-m-00000
pig.store("a", datadir + "aout", "PigStorage('\\t', '-schema')");
-
+
// Verify input source tag is present when using -tagFile or -tagPath
pig.registerQuery("b = LOAD '" + datadir + "aout' using PigStorage('\\t', '-tagFile');");
pig.registerQuery("c = foreach b generate INPUT_FILE_NAME;");
@@ -541,5 +544,32 @@ public class TestPigStorage {
String inputFileName = (String)tuple.get(0);
assertEquals("tagFile value must be part-m-00000", inputFileName, storeFileName);
}
- }
+ }
+
+ @Test
+ public void testIncompleteDataWithPigSchema() throws Exception {
+ File parent = new File(datadir, "incomplete_data_with_pig_schema_1");
+ parent.deleteOnExit();
+ parent.mkdirs();
+ File tmpInput = File.createTempFile("tmp", "tmp");
+ tmpInput.deleteOnExit();
+ File outFile = new File(parent, "out");
+ pig.registerQuery("a = load '"+tmpInput.getAbsolutePath()+"' as (x:int, y:chararray, z:chararray);");
+ pig.store("a", outFile.getAbsolutePath(), "PigStorage('\\t', '-schema')");
+ File schemaFile = new File(outFile, ".pig_schema");
+
+ parent = new File(datadir, "incomplete_data_with_pig_schema_2");
+ parent.deleteOnExit();
+ File inputDir = new File(parent, "input");
+ inputDir.mkdirs();
+ File inputSchemaFile = new File(inputDir, ".pig_schema");
+ FileUtils.moveFile(schemaFile, inputSchemaFile);
+ File inputFile = new File(inputDir, "data");
+ Util.writeToFile(inputFile, new String[]{"1"});
+ pig.registerQuery("a = load '"+inputDir.getAbsolutePath()+"';");
+ Iterator<Tuple> it = pig.openIterator("a");
+ assertTrue(it.hasNext());
+ assertEquals(tuple(1,null,null), it.next());
+ assertFalse(it.hasNext());
+ }
}
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1424633&r1=1424632&r2=1424633&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Dec 20 18:50:44 2012
@@ -88,11 +88,11 @@ import org.apache.pig.impl.util.LogUtils
import org.apache.pig.newplan.logical.optimizer.DanglingNestedNodeRemover;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.optimizer.UidResetter;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
-import org.apache.pig.newplan.logical.optimizer.UidResetter;
import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
@@ -180,7 +180,7 @@ public class Util {
for(int i = 0; i < b.length; i++)
t.append(b[i]);
}
-
+
static public Tuple buildTuple(Object... args) throws ExecException {
return TupleFactory.getInstance().newTupleNoCopy(Lists.newArrayList(args));
}
@@ -189,7 +189,7 @@ public class Util {
return TupleFactory.getInstance().newTuple(Lists.transform(
Lists.newArrayList(args), new Function<Object, DataByteArray>() {
public DataByteArray apply(Object o) {
- if (o == null) {
+ if (o == null) {
return null;
}
try {
@@ -207,14 +207,14 @@ public class Util {
addToTuple(t, s);
return t;
}
-
+
static public DataBag createBag(Tuple[] t)
{
DataBag b = mBagFactory.newDefaultBag();
for(int i = 0; i < t.length; i++)b.add(t[i]);
return b;
}
-
+
static public<T> DataBag createBagOfOneColumn(T[] input) throws ExecException {
DataBag result = mBagFactory.newDefaultBag();
for (int i = 0; i < input.length; i++) {
@@ -224,7 +224,7 @@ public class Util {
}
return result;
}
-
+
static public Map<String, Object> createMap(String[] contents)
{
Map<String, Object> m = new HashMap<String, Object>();
@@ -239,10 +239,10 @@ public class Util {
DataByteArray[] dbas = new DataByteArray[input.length];
for (int i = 0; i < input.length; i++) {
dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].toString().getBytes());
- }
+ }
return dbas;
}
-
+
static public Tuple loadNestTuple(Tuple t, int[][] input) throws ExecException {
for (int i = 0; i < input.length; i++) {
DataBag bag = BagFactory.getInstance().newDefaultBag();
@@ -278,7 +278,7 @@ public class Util {
/**
* Helper to create a temporary file with given input data for use in test cases.
- *
+ *
* @param tmpFilenamePrefix file-name prefix
* @param tmpFilenameSuffix file-name suffix
* @param inputData input for test cases, each string in inputData[] is written
@@ -286,52 +286,52 @@ public class Util {
* @return {@link File} handle to the created temporary file
* @throws IOException
*/
- static public File createInputFile(String tmpFilenamePrefix,
- String tmpFilenameSuffix,
- String[] inputData)
+ static public File createInputFile(String tmpFilenamePrefix,
+ String tmpFilenameSuffix,
+ String[] inputData)
throws IOException {
File f = File.createTempFile(tmpFilenamePrefix, tmpFilenameSuffix);
f.deleteOnExit();
- writeToFile(f, inputData);
+ writeToFile(f, inputData);
return f;
}
-
- static public File createLocalInputFile(String filename, String[] inputData)
+
+ static public File createLocalInputFile(String filename, String[] inputData)
throws IOException {
File f = new File(filename);
f.deleteOnExit();
- writeToFile(f, inputData);
+ writeToFile(f, inputData);
return f;
}
-
- private static void writeToFile(File f, String[] inputData) throws
+
+ public static void writeToFile(File f, String[] inputData) throws
IOException {
- PrintWriter pw = new PrintWriter(new OutputStreamWriter(new
+ PrintWriter pw = new PrintWriter(new OutputStreamWriter(new
FileOutputStream(f), "UTF-8"));
for (int i=0; i<inputData.length; i++){
pw.println(inputData[i]);
}
pw.close();
}
-
+
/**
* Helper to create a dfs file on the Minicluster DFS with given
* input data for use in test cases.
- *
+ *
* @param miniCluster reference to the Minicluster where the file should be created
* @param fileName pathname of the file to be created
* @param inputData input for test cases, each string in inputData[] is written
* on one line
* @throws IOException
*/
- static public void createInputFile(MiniCluster miniCluster, String fileName,
- String[] inputData)
+ static public void createInputFile(MiniCluster miniCluster, String fileName,
+ String[] inputData)
throws IOException {
FileSystem fs = miniCluster.getFileSystem();
createInputFile(fs, fileName, inputData);
}
-
- static public void createInputFile(FileSystem fs, String fileName,
+
+ static public void createInputFile(FileSystem fs, String fileName,
String[] inputData) throws IOException {
if(fs.exists(new Path(fileName))) {
throw new IOException("File " + fileName + " already exists on the FileSystem");
@@ -344,7 +344,7 @@ public class Util {
pw.close();
}
-
+
static public String[] readOutput(FileSystem fs, String fileName) throws IOException {
Path path = new Path(fileName);
if(!fs.exists(path)) {
@@ -373,11 +373,11 @@ public class Util {
}
return result.toArray(new String[result.size()]);
}
-
+
/**
* Helper to create a dfs file on the MiniCluster dfs. This returns an
* outputstream that can be used in test cases to write data.
- *
+ *
* @param cluster
* reference to the MiniCluster where the file should be created
* @param fileName
@@ -395,13 +395,13 @@ public class Util {
}
return fs.create(new Path(fileName));
}
-
+
/**
* Helper to create an empty temp file on local file system
* which will be deleted on exit
* @param prefix
* @param suffix
- * @return File denoting a newly-created empty file
+ * @return File denoting a newly-created empty file
* @throws IOException
*/
static public File createTempFileDelOnExit(String prefix, String suffix)
@@ -411,84 +411,84 @@ public class Util {
return tmpFile;
}
-
+
/**
* Helper to remove a dfs file from the minicluster DFS
- *
+ *
* @param miniCluster reference to the Minicluster where the file should be deleted
* @param fileName pathname of the file to be deleted
* @throws IOException
*/
- static public void deleteFile(MiniCluster miniCluster, String fileName)
+ static public void deleteFile(MiniCluster miniCluster, String fileName)
throws IOException {
FileSystem fs = miniCluster.getFileSystem();
fs.delete(new Path(fileName), true);
}
- static public void deleteFile(PigContext pigContext, String fileName)
+ static public void deleteFile(PigContext pigContext, String fileName)
throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
FileSystem fs = FileSystem.get(conf);
fs.delete(new Path(fileName), true);
}
-
- static public boolean exists(PigContext pigContext, String fileName)
+
+ static public boolean exists(PigContext pigContext, String fileName)
throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
FileSystem fs = FileSystem.get(conf);
return fs.exists(new Path(fileName));
}
-
+
/**
- * Helper function to check if the result of a Pig Query is in line with
+ * Helper function to check if the result of a Pig Query is in line with
* expected results.
- *
+ *
* @param actualResults Result of the executed Pig query
* @param expectedResults Expected results Array to validate against
*/
- static public void checkQueryOutputs(Iterator<Tuple> actualResults,
+ static public void checkQueryOutputs(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
for (Tuple expected : expectedResults) {
Tuple actual = actualResults.next();
Assert.assertEquals(expected.toString(), actual.toString());
}
}
-
+
/**
- * Helper function to check if the result of a Pig Query is in line with
+ * Helper function to check if the result of a Pig Query is in line with
* expected results.
- *
+ *
* @param actualResults Result of the executed Pig query
* @param expectedResults Expected results List to validate against
*/
- static public void checkQueryOutputs(Iterator<Tuple> actualResults,
+ static public void checkQueryOutputs(Iterator<Tuple> actualResults,
List<Tuple> expectedResults) {
-
+
checkQueryOutputs(actualResults,expectedResults.toArray(new Tuple[expectedResults.size()]));
}
/**
- * Helper function to check if the result of a Pig Query is in line with
+ * Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected results before comparison
- *
+ *
* @param actualResultsIt Result of the executed Pig query
* @param expectedResList Expected results to validate against
*/
- static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
+ static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
List<Tuple> expectedResList) {
List<Tuple> actualResList = new ArrayList<Tuple>();
while(actualResultsIt.hasNext()){
actualResList.add(actualResultsIt.next());
}
-
+
compareActualAndExpectedResults(actualResList, expectedResList);
-
+
}
-
-
+
+
static public void compareActualAndExpectedResults(
List<Tuple> actualResList, List<Tuple> expectedResList) {
Collections.sort(actualResList);
@@ -496,11 +496,11 @@ public class Util {
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
-
+
}
/**
- * Check if subStr is a subString of str . calls org.junit.Assert.fail if it is not
+ * Check if subStr is a subString of str . calls org.junit.Assert.fail if it is not
* @param str
* @param subStr
*/
@@ -509,7 +509,7 @@ public class Util {
fail("String '"+ subStr + "' is not a substring of '" + str + "'");
}
}
-
+
/**
* Check if query plan for alias argument produces exception with expected
* error message in expectedErr argument.
@@ -541,17 +541,17 @@ public class Util {
PigException pigEx = LogUtils.getPigException(e);
String message = pigEx.getMessage();
checkErrorMessageContainsExpected(message, expectedErr);
-
+
}
public static void checkErrorMessageContainsExpected(String message, String expectedMessage){
if(!message.contains(expectedMessage)){
- String msg = "Expected error message containing '"
+ String msg = "Expected error message containing '"
+ expectedMessage + "' but got '" + message + "'" ;
fail(msg);
- }
+ }
}
-
+
static private String getMkDirCommandForHadoop2_0(String fileName) {
if (Util.isHadoop23() || Util.isHadoop2_0()) {
Path parentDir = new Path(fileName).getParent();
@@ -560,7 +560,7 @@ public class Util {
}
return "";
}
-
+
/**
* Utility method to copy a file form local filesystem to the dfs on
* the minicluster for testing in mapreduce mode
@@ -582,14 +582,14 @@ public class Util {
throw new IOException(e);
}
}
-
+
static public void copyFromLocalToLocal(String fromLocalFileName,
String toLocalFileName) throws IOException {
PigServer ps = new PigServer(ExecType.LOCAL, new Properties());
String script = getMkDirCommandForHadoop2_0(toLocalFileName) + "fs -cp " + fromLocalFileName + " " + toLocalFileName;
new File(toLocalFileName).deleteOnExit();
-
+
GruntParser parser = new GruntParser(new StringReader(script));
parser.setInteractive(false);
parser.setParams(ps);
@@ -598,22 +598,22 @@ public class Util {
} catch (org.apache.pig.tools.pigscript.parser.ParseException e) {
throw new IOException(e);
}
-
+
}
-
+
static public void copyFromClusterToLocal(MiniCluster cluster, String fileNameOnCluster, String localFileName) throws IOException {
File parent = new File(localFileName).getParentFile();
if (!parent.exists()) {
parent.mkdirs();
}
PrintWriter writer = new PrintWriter(new FileWriter(localFileName));
-
+
FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
cluster.getProperties()));
if(!fs.exists(new Path(fileNameOnCluster))) {
throw new IOException("File " + fileNameOnCluster + " does not exists on the minicluster");
}
-
+
String line = null;
FileStatus fst = fs.getFileStatus(new Path(fileNameOnCluster));
if(fst.isDir()) {
@@ -623,14 +623,14 @@ public class Util {
FSDataInputStream stream = fs.open(new Path(fileNameOnCluster));
BufferedReader reader = new BufferedReader(new InputStreamReader(stream));
while( (line = reader.readLine()) != null) {
- writer.println(line);
+ writer.println(line);
}
-
+
reader.close();
writer.close();
}
-
- static public void printQueryOutput(Iterator<Tuple> actualResults,
+
+ static public void printQueryOutput(Iterator<Tuple> actualResults,
Tuple[] expectedResults) {
System.out.println("Expected :") ;
@@ -638,7 +638,7 @@ public class Util {
System.out.println(expected.toString()) ;
}
System.out.println("---End----") ;
-
+
System.out.println("Actual :") ;
while (actualResults.hasNext()) {
System.out.println(actualResults.next().toString()) ;
@@ -647,10 +647,10 @@ public class Util {
}
/**
- * Helper method to replace all occurrences of "\" with "\\" in a
+ * Helper method to replace all occurrences of "\" with "\\" in a
* string. This is useful to fix the file path string on Windows
* where "\" is used as the path separator.
- *
+ *
* @param str Any string
* @return The resulting string
*/
@@ -660,7 +660,7 @@ public class Util {
return str.replaceAll(regex, replacement);
}
- public static String generateURI(String filename, PigContext context)
+ public static String generateURI(String filename, PigContext context)
throws IOException {
if (context.getExecType() == ExecType.MAPREDUCE) {
return FileLocalizer.hadoopify(filename, context);
@@ -672,11 +672,11 @@ public class Util {
}
public static Object getPigConstant(String pigConstantAsString) throws ParserException {
- QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
+ QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
"util", new HashMap<String, String>() ) ;
return queryParser.parseConstant(pigConstantAsString);
}
-
+
/**
* Parse list of strings in to list of tuples, convert quoted strings into
* @param tupleConstants
@@ -707,7 +707,7 @@ public class Util {
}
return tuples;
}
-
+
/**
* Convert String objects in argument t to DataByteArray objects
* @param t
@@ -721,8 +721,8 @@ public class Util {
if(col == null)
continue;
if(col instanceof String){
- DataByteArray dba = (col == null) ?
- null : new DataByteArray((String)col);
+ DataByteArray dba = (col == null) ?
+ null : new DataByteArray((String)col);
t.set(i, dba);
}else if(col instanceof Tuple){
convertStringToDataByteArray((Tuple)col);
@@ -733,8 +733,8 @@ public class Util {
}
}
-
- }
+
+ }
}
public static File createFile(String[] data) throws Exception{
@@ -746,42 +746,42 @@ public class Util {
pw.close();
return f;
}
-
+
/**
* Run default set of optimizer rules on new logical plan
* @param lp
* @return optimized logical plan
* @throws FrontendException
*/
- public static LogicalPlan optimizeNewLP(
+ public static LogicalPlan optimizeNewLP(
LogicalPlan lp)
throws FrontendException{
DanglingNestedNodeRemover DanglingNestedNodeRemover = new DanglingNestedNodeRemover( lp );
DanglingNestedNodeRemover.visit();
-
+
UidResetter uidResetter = new UidResetter( lp );
uidResetter.visit();
-
- SchemaResetter schemaResetter =
+
+ SchemaResetter schemaResetter =
new SchemaResetter( lp, true /*disable duplicate uid check*/ );
schemaResetter.visit();
StoreAliasSetter storeAliasSetter = new StoreAliasSetter( lp );
storeAliasSetter.visit();
-
+
// run optimizer
- org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer =
+ org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer =
new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(lp, 100, null);
optimizer.optimize();
-
+
SortInfoSetter sortInfoSetter = new SortInfoSetter( lp );
sortInfoSetter.visit();
-
+
return lp;
}
-
+
/**
- * migrate old LP(logical plan) to new LP, optimize it, and build physical
+ * migrate old LP(logical plan) to new LP, optimize it, and build physical
* plan
* @param lp
* @param pc PigContext
@@ -796,16 +796,16 @@ public class Util {
visitor.visit();
return visitor.getPhysicalPlan();
}
-
+
public static MROperPlan buildMRPlan(PhysicalPlan pp, PigContext pc) throws Exception{
MRCompiler comp = new MRCompiler(pp, pc);
comp.compile();
- return comp.getMRPlan();
+ return comp.getMRPlan();
}
-
+
public static MROperPlan buildMRPlanWithOptimizer(PhysicalPlan pp, PigContext pc) throws Exception {
MapRedUtil.checkLeafIsStore(pp, pc);
-
+
MapReduceLauncher launcher = new MapReduceLauncher();
java.lang.reflect.Method compile = launcher.getClass()
@@ -816,7 +816,7 @@ public class Util {
return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
}
-
+
public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
LogicalPlan lp = Util.parse(query, pc);
Util.optimizeNewLP(lp);
@@ -824,7 +824,7 @@ public class Util {
MROperPlan mrp = Util.buildMRPlanWithOptimizer(pp, pc);
return mrp;
}
-
+
public static void registerMultiLineQuery(PigServer pigServer, String query) throws IOException {
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
@@ -832,13 +832,13 @@ public class Util {
pw.close();
pigServer.registerScript(f.getCanonicalPath());
}
-
+
public static int executeJavaCommand(String cmd) throws Exception {
return executeJavaCommandAndReturnInfo(cmd).exitCode;
}
-
-
- public static ProcessReturnInfo executeJavaCommandAndReturnInfo(String cmd)
+
+
+ public static ProcessReturnInfo executeJavaCommandAndReturnInfo(String cmd)
throws Exception {
String javaHome = System.getenv("JAVA_HOME");
if(javaHome != null) {
@@ -853,7 +853,7 @@ public class Util {
pri.exitCode = cmdProc.exitValue();
return pri;
}
-
+
private static String getContents(InputStream istr) throws IOException {
BufferedReader br = new BufferedReader(
new InputStreamReader(istr));
@@ -863,20 +863,20 @@ public class Util {
s += line + "\n";
}
return s;
-
+
}
public static class ProcessReturnInfo {
public int exitCode;
public String stderrContents;
public String stdoutContents;
-
+
@Override
public String toString() {
return "[Exit code: " + exitCode + ", stdout: <" + stdoutContents + ">, " +
- "stderr: <" + stderrContents + ">";
+ "stderr: <" + stderrContents + ">";
}
}
-
+
static public boolean deleteDirectory(File path) {
if(path.exists()) {
File[] files = path.listFiles();
@@ -896,22 +896,22 @@ public class Util {
* @param pigContext
* @param fileName
* @param input
- * @throws IOException
+ * @throws IOException
*/
public static void createInputFile(PigContext pigContext,
String fileName, String[] input) throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
- createInputFile(FileSystem.get(conf), fileName, input);
+ createInputFile(FileSystem.get(conf), fileName, input);
}
-
+
public static String[] readOutput(PigContext pigContext,
String fileName) throws IOException {
Configuration conf = ConfigurationUtil.toConfiguration(
pigContext.getProperties());
- return readOutput(FileSystem.get(conf), fileName);
+ return readOutput(FileSystem.get(conf), fileName);
}
-
+
public static void printPlan(LogicalPlan logicalPlan ) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(out);
@@ -939,9 +939,9 @@ public class Util {
reader.close();
return tuples;
}
-
+
/**
- * Delete the existing logFile for the class and set the logging to a
+ * Delete the existing logFile for the class and set the logging to a
* use a new log file and set log level to DEBUG
* @param clazz class for which the log file is being set
* @param logFile current log file
@@ -963,10 +963,10 @@ public class Util {
}
/**
- * Check if logFile (does not/)contains the given list of messages.
+ * Check if logFile (does not/)contains the given list of messages.
* @param logFile
* @param messages
- * @param expected if true, the messages are expected in the logFile,
+ * @param expected if true, the messages are expected in the logFile,
* otherwise messages should not be there in the log
*/
public static void checkLogFileMessage(File logFile, String[] messages, boolean expected) {
@@ -999,7 +999,7 @@ public class Util {
fail("caught exception while checking log message :" + e);
}
}
-
+
public static LogicalPlan buildLp(PigServer pigServer, String query)
throws Exception {
pigServer.setBatchOn();
@@ -1014,50 +1014,50 @@ public class Util {
buildLp( pigServer, query );
java.lang.reflect.Method compilePp = pigServer.getClass().getDeclaredMethod("compilePp" );
compilePp.setAccessible(true);
-
+
return (PhysicalPlan)compilePp.invoke( pigServer );
-
+
}
public static LogicalPlan parse(String query, PigContext pc) throws FrontendException {
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver parserDriver = new QueryParserDriver( pc, "test", fileNameMap );
org.apache.pig.newplan.logical.relational.LogicalPlan lp = parserDriver.parse( query );
-
+
new ColumnAliasConversionVisitor(lp).visit();
new SchemaAliasVisitor(lp).visit();
new ScalarVisitor(lp, pc, "test").visit();
-
+
CompilationMessageCollector collector = new CompilationMessageCollector() ;
-
+
new TypeCheckingRelVisitor( lp, collector).visit();
-
+
new UnionOnSchemaSetter( lp ).visit();
new CastLineageSetter(lp, collector).visit();
return lp;
}
-
+
public static LogicalPlan parseAndPreprocess(String query, PigContext pc) throws FrontendException {
Map<String, String> fileNameMap = new HashMap<String, String>();
QueryParserDriver parserDriver = new QueryParserDriver( pc, "test", fileNameMap );
org.apache.pig.newplan.logical.relational.LogicalPlan lp = parserDriver.parse( query );
-
+
new ColumnAliasConversionVisitor( lp ).visit();
new SchemaAliasVisitor( lp ).visit();
new ScalarVisitor(lp, pc, "test").visit();
-
+
CompilationMessageCollector collector = new CompilationMessageCollector() ;
-
+
new TypeCheckingRelVisitor( lp, collector).visit();
-
+
new UnionOnSchemaSetter( lp ).visit();
new CastLineageSetter(lp, collector).visit();
return lp;
}
-
-
+
+
/**
- * Replaces any alias in given schema that has name that starts with
+ * Replaces any alias in given schema that has name that starts with
* "NullAlias" with null . it does a case insensitive comparison of
* the alias name
* @param sch
@@ -1073,15 +1073,15 @@ public class Util {
}
}
-
- static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
+
+ static public void checkQueryOutputsAfterSort(Iterator<Tuple> actualResultsIt,
Tuple[] expectedResArray) {
List<Tuple> list = new ArrayList<Tuple>();
Collections.addAll(list, expectedResArray);
checkQueryOutputsAfterSort(actualResultsIt, list);
}
-
-
+
+
static private void convertBagToSortedBag(Tuple t) {
for (int i=0;i<t.size();i++) {
Object obj = null;
@@ -1107,53 +1107,53 @@ public class Util {
}
}
}
-
- static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
+
+ static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, String schemaString) throws IOException {
LogicalSchema resultSchema = org.apache.pig.impl.util.Utils.parseSchema(schemaString);
checkQueryOutputsAfterSortRecursive(actualResultsIt, expectedResArray, resultSchema);
}
/**
- * Helper function to check if the result of a Pig Query is in line with
+ * Helper function to check if the result of a Pig Query is in line with
* expected results. It sorts actual and expected string results before comparison
- *
+ *
* @param actualResultsIt Result of the executed Pig query
* @param expectedResArray Expected string results to validate against
* @param fs fieldSchema of expecteResArray
- * @throws IOException
+ * @throws IOException
*/
- static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
+ static public void checkQueryOutputsAfterSortRecursive(Iterator<Tuple> actualResultsIt,
String[] expectedResArray, LogicalSchema schema) throws IOException {
LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
-
+
LoadCaster caster = new Utf8StorageConverter();
List<Tuple> actualResList = new ArrayList<Tuple>();
while(actualResultsIt.hasNext()){
actualResList.add(actualResultsIt.next());
}
-
+
List<Tuple> expectedResList = new ArrayList<Tuple>();
for (String str : expectedResArray) {
Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
expectedResList.add(newTuple);
}
-
+
for (Tuple t : actualResList) {
convertBagToSortedBag(t);
}
-
+
for (Tuple t : expectedResList) {
convertBagToSortedBag(t);
}
-
+
Collections.sort(actualResList);
Collections.sort(expectedResList);
-
+
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
}
-
+
public static String readFile(File file) throws IOException {
BufferedReader reader = new BufferedReader(new FileReader(file));
String result = "";
Modified: pig/trunk/test/org/apache/pig/test/data/output1.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/output1.pig?rev=1424633&r1=1424632&r2=1424633&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/data/output1.pig (original)
+++ pig/trunk/test/org/apache/pig/test/data/output1.pig Thu Dec 20 18:50:44 2012
@@ -1,7 +1,3 @@
-define mymacro1(A, rate) returns B {
- C = group $A by $0 parallel $rate;
- B = foreach C generate group, COUNT($A);
-};
aa = load '/data/intermediate/pow/elcarobootstrap/account/full/weekly/data/20080228' using PigStorage('\x01');
bb = filter aa by (ARITY == '16') and ( $4 eq '' or $4 eq 'NULL' or $4 eq 'ss') parallel 400;