You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/04 23:20:54 UTC
svn commit: r1584916 - in /pig/branches/branch-0.12: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/
Author: cheolsoo
Date: Fri Apr 4 21:20:54 2014
New Revision: 1584916
URL: http://svn.apache.org/r1584916
Log:
PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini via cheolsoo)
Modified:
pig/branches/branch-0.12/CHANGES.txt
pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
Modified: pig/branches/branch-0.12/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/CHANGES.txt?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/CHANGES.txt (original)
+++ pig/branches/branch-0.12/CHANGES.txt Fri Apr 4 21:20:54 2014
@@ -34,6 +34,8 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3661: Piggybank AvroStorage fails if used in more than one load or store statement (rohini via cheolsoo)
+
PIG-3819: e2e tests containing "perl -e "print $_;" fails on Hadoop 2 (daijy)
PIG-3813: Rank column is assigned different uids everytime when schema is reset (cheolsoo)
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java Fri Apr 4 21:20:54 2014
@@ -21,16 +21,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
-import java.util.HashSet;
-import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -105,6 +103,7 @@ public class AvroStorage extends FileInp
private boolean checkSchema = true; /*whether check schema of input directories*/
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
+ private String contextSignature = null;
/**
* Empty constructor. Output schema is derived from pig schema.
@@ -151,11 +150,15 @@ public class AvroStorage extends FileInp
if (inputAvroSchema != null) {
return;
}
- Set<Path> paths = new HashSet<Path>();
+
Configuration conf = job.getConfiguration();
- if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+ Set<Path> paths = AvroStorageUtils.getPaths(location, conf, true);
+ if (!paths.isEmpty()) {
+ // Set top level directories in input format. Adding all files will
+ // bloat configuration size
+ FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
+ // Scan all directories including sub directories for schema
setInputAvroSchema(paths, conf);
- FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
} else {
throw new IOException("Input path \'" + location + "\' is not found");
}
@@ -193,9 +196,17 @@ public class AvroStorage extends FileInp
if (paths == null || paths.isEmpty()) {
return null;
}
- Path path = paths.iterator().next();
- FileSystem fs = FileSystem.get(path.toUri(), conf);
- return getAvroSchema(path, fs);
+ Iterator<Path> iterator = paths.iterator();
+ Schema schema = null;
+ while (iterator.hasNext()) {
+ Path path = iterator.next();
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ schema = getAvroSchema(path, fs);
+ if (schema != null) {
+ break;
+ }
+ }
+ return schema;
}
/**
@@ -237,7 +248,7 @@ public class AvroStorage extends FileInp
System.out.println("Do not check schema; use schema of " + s.getPath());
return schema;
}
- } else if (!schema.equals(newSchema)) {
+ } else if (newSchema != null && !schema.equals(newSchema)) {
throw new IOException( "Input path is " + path + ". Sub-direcotry " + s.getPath()
+ " contains different schema " + newSchema + " than " + schema);
}
@@ -254,19 +265,23 @@ public class AvroStorage extends FileInp
* Merge multiple input avro schemas into one. Note that we can't merge arbitrary schemas.
* Please see AvroStorageUtils.mergeSchema() for what's allowed and what's not allowed.
*
- * @param paths set of input files
+ * @param basePaths set of input dir or files
* @param conf configuration
* @return avro schema
* @throws IOException
*/
- protected Schema getMergedSchema(Set<Path> paths, Configuration conf) throws IOException {
+ protected Schema getMergedSchema(Set<Path> basePaths, Configuration conf) throws IOException {
Schema result = null;
Map<Path, Schema> mergedFiles = new HashMap<Path, Schema>();
+
+ Set<Path> paths = AvroStorageUtils.getAllFilesRecursively(basePaths, conf);
for (Path path : paths) {
FileSystem fs = FileSystem.get(path.toUri(), conf);
Schema schema = getSchema(path, fs);
- result = AvroStorageUtils.mergeSchema(result, schema);
- mergedFiles.put(path, schema);
+ if (schema != null) {
+ result = AvroStorageUtils.mergeSchema(result, schema);
+ mergedFiles.put(path, schema);
+ }
}
// schemaToMergedSchemaMap is only needed when merging multiple records.
if (mergedFiles.size() > 1 && result.getType().equals(Schema.Type.RECORD)) {
@@ -302,6 +317,9 @@ public class AvroStorage extends FileInp
protected Schema getSchemaFromFile(Path path, FileSystem fs) throws IOException {
/* get path of the last file */
Path lastFile = AvroStorageUtils.getLast(path, fs);
+ if (lastFile == null) {
+ return null;
+ }
/* read in file and obtain schema */
GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
@@ -360,9 +378,14 @@ public class AvroStorage extends FileInp
/* get avro schema */
AvroStorageLog.funcCall("getSchema");
if (inputAvroSchema == null) {
- Set<Path> paths = new HashSet<Path>();
Configuration conf = job.getConfiguration();
- if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
+ // If within a script, you store to one location and read from same
+ // location using AvroStorage getPaths will be empty. Since
+ // getSchema is called during script parsing we don't want to fail
+ // here if path not found
+
+ Set<Path> paths = AvroStorageUtils.getPaths(location, conf, false);
+ if (!paths.isEmpty()) {
setInputAvroSchema(paths, conf);
}
}
@@ -617,8 +640,7 @@ public class AvroStorage extends FileInp
@Override
public void checkSchema(ResourceSchema s) throws IOException {
AvroStorageLog.funcCall("Check schema");
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(ResourceSchema.class);
+ Properties property = getUDFProperties();
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
@@ -651,6 +673,14 @@ public class AvroStorage extends FileInp
AvroStorageLog.details("New schemas=" + newSchemaStr);
}
+ /**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext()
+ .getUDFProperties(this.getClass(), new String[] {contextSignature});
+ }
+
private String getSchemaKey() {
return Integer.toString(storeFuncIndex);
}
@@ -678,8 +708,7 @@ public class AvroStorage extends FileInp
public OutputFormat getOutputFormat() throws IOException {
AvroStorageLog.funcCall("getOutputFormat");
- UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(ResourceSchema.class);
+ Properties property = getUDFProperties();
String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
Map<String, String> map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
@@ -701,7 +730,7 @@ public class AvroStorage extends FileInp
@Override
public void setStoreFuncUDFContextSignature(String signature) {
- // Nothing to do
+ this.contextSignature = signature;
}
@Override
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorageUtils.java Fri Apr 4 21:20:54 2014
@@ -17,12 +17,10 @@
package org.apache.pig.piggybank.storage.avro;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -30,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericDatumReader;
@@ -40,14 +37,10 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.data.DataType;
-import org.apache.pig.piggybank.storage.avro.AvroStorageLog;
-
import org.codehaus.jackson.JsonNode;
/**
* This is utility class for this package
@@ -100,51 +93,59 @@ public class AvroStorageUtils {
}
/**
- * get input paths to job config
- */
- public static boolean addInputPaths(String pathString, Job job) throws IOException {
- Configuration conf = job.getConfiguration();
- FileSystem fs = FileSystem.get(conf);
- HashSet<Path> paths = new HashSet<Path>();
- if (getAllSubDirs(new Path(pathString), conf, paths)) {
- paths.addAll(Arrays.asList(FileInputFormat.getInputPaths(job)));
- FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
- return true;
- }
- return false;
- }
-
- /**
- * Adds all non-hidden directories and subdirectories to set param
- * it supports comma-separated input paths and glob style path
+ * Gets the list of paths from the pathString specified which may contain
+ * comma-separated paths and glob style path
*
* @throws IOException
*/
- public static boolean getAllSubDirs(Path path, Configuration conf,
- Set<Path> paths) throws IOException {
- String[] pathStrs = LoadFunc.getPathStrings(path.toString());
+ public static Set<Path> getPaths(String pathString, Configuration conf, boolean failIfNotFound)
+ throws IOException {
+ Set<Path> paths = new HashSet<Path>();
+ String[] pathStrs = LoadFunc.getPathStrings(pathString);
for (String pathStr : pathStrs) {
FileSystem fs = FileSystem.get(new Path(pathStr).toUri(), conf);
FileStatus[] matchedFiles = fs.globStatus(new Path(pathStr), PATH_FILTER);
if (matchedFiles == null || matchedFiles.length == 0) {
- return false;
+ if (failIfNotFound) {
+ throw new IOException("Input Pattern " + pathStr + " matches 0 files");
+ } else {
+ continue;
+ }
}
for (FileStatus file : matchedFiles) {
- getAllSubDirsInternal(file, conf, paths, fs);
+ paths.add(file.getPath());
}
}
- return true;
+ return paths;
+ }
+
+ /**
+ * Returns all non-hidden files recursively inside the base paths given
+ *
+ * @throws IOException
+ */
+ public static Set<Path> getAllFilesRecursively(Set<Path> basePaths, Configuration conf) throws IOException {
+ Set<Path> paths = new HashSet<Path>();
+ for (Path path : basePaths) {
+ FileSystem fs = FileSystem.get(path.toUri(), conf);
+ FileStatus f = fs.getFileStatus(path);
+ if (f.isDir()) {
+ getAllFilesInternal(f, conf, paths, fs);
+ } else {
+ paths.add(path);
+ }
+ }
+ return paths;
}
- private static void getAllSubDirsInternal(FileStatus file, Configuration conf,
+ private static void getAllFilesInternal(FileStatus file, Configuration conf,
Set<Path> paths, FileSystem fs) throws IOException {
- if (file.isDir()) {
- for (FileStatus sub : fs.listStatus(file.getPath())) {
- getAllSubDirsInternal(sub, conf, paths, fs);
+ for (FileStatus f : fs.listStatus(file.getPath(), PATH_FILTER)) {
+ if (f.isDir()) {
+ getAllFilesInternal(f, conf, paths, fs);
+ } else {
+ paths.add(f.getPath());
}
- } else {
- AvroStorageLog.details("Add input file:" + file);
- paths.add(file.getPath());
}
}
@@ -160,22 +161,24 @@ public class AvroStorageUtils {
/** get last file of a hdfs path if it is a directory;
* or return the file itself if path is a file
*/
- public static Path getLast(String path, FileSystem fs) throws IOException {
- return getLast(new Path(path), fs);
- }
-
- /** get last file of a hdfs path if it is a directory;
- * or return the file itself if path is a file
- */
public static Path getLast(Path path, FileSystem fs) throws IOException {
+ FileStatus status = fs.getFileStatus(path);
+ if (!status.isDir()) {
+ return path;
+ }
FileStatus[] statuses = fs.listStatus(path, PATH_FILTER);
if (statuses.length == 0) {
- return path;
+ return null;
} else {
Arrays.sort(statuses);
- return statuses[statuses.length - 1].getPath();
+ for (int i = statuses.length - 1; i >= 0; i--) {
+ if (!statuses[i].isDir()) {
+ return statuses[i].getPath();
+ }
+ }
+ return null;
}
}
@@ -705,6 +708,9 @@ public class AvroStorageUtils {
public static Schema getSchema(Path path, FileSystem fs) throws IOException {
/* get path of the last file */
Path lastFile = AvroStorageUtils.getLast(path, fs);
+ if (lastFile == null) {
+ return null;
+ }
/* read in file and obtain schema */
GenericDatumReader<Object> avroReader = new GenericDatumReader<Object>();
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroInputFormat.java Fri Apr 4 21:20:54 2014
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,9 +18,9 @@
package org.apache.pig.piggybank.storage.avro;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
/**
* The InputFormat for avro data.
@@ -86,4 +87,14 @@ public class PigAvroInputFormat extends
ignoreBadFiles, schemaToMergedSchemaMap, useMultipleSchemas);
}
+ /*
+ * This is to support multi-level/recursive directory listing until
+ * MAPREDUCE-1577 is fixed.
+ */
+ @Override
+ protected List<FileStatus> listStatus(JobContext job) throws IOException {
+ return MapRedUtil.getAllFileRecursively(super.listStatus(job),
+ job.getConfiguration());
+ }
+
}
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroRecordReader.java Fri Apr 4 21:20:54 2014
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -127,6 +127,15 @@ public class PigAvroRecordReader extends
JsonNode defValue = subFields.get(i).defaultValue();
if (defValue != null) {
Schema.Type type = subFields.get(i).schema().getType();
+ if (type.equals(Schema.Type.UNION)) {
+ List<Schema> schemas = subFields.get(i).schema().getTypes();
+ for (Schema schema : schemas) {
+ if (!schema.getType().equals(Schema.Type.NULL)) {
+ type = schema.getType();
+ break;
+ }
+ }
+ }
switch (type) {
case BOOLEAN:
mProtoTuple.add(i, defValue.getBooleanValue());
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java Fri Apr 4 21:20:54 2014
@@ -19,6 +19,16 @@ package org.apache.pig.piggybank.test.st
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.schema;
import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
@@ -29,7 +39,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigServer;
@@ -41,26 +50,13 @@ import org.apache.pig.builtin.mock.Stora
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.piggybank.storage.avro.AvroStorage;
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-import org.apache.pig.test.MiniCluster;
import org.apache.pig.test.Util;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-import static junit.framework.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
public class TestAvroStorage {
protected static final Log LOG = LogFactory.getLog(TestAvroStorage.class);
@@ -594,6 +590,7 @@ public class TestAvroStorage {
@Test
public void testUserDefinedLoadSchema() throws IOException {
+ PigSchema2Avro.setTupleIndex(2);
// Verify that user specified schema correctly maps to input schemas
// Input Avro files have the following schemas:
// name:"string", address:[customField1:"int", addressLine:"string"]
@@ -604,7 +601,7 @@ public class TestAvroStorage {
// dropping, adding, and reordering fields where needed.
String output= outbasedir + "testUserDefinedLoadSchema";
String expected = basedir + "expected_testUserDefinedLoadSchema.avro";
- String customSchema =
+ String customSchema =
"{\"type\": \"record\", \"name\": \"employee\", \"fields\": [ "
+"{ \"default\": \"***\", \"type\": \"string\", \"name\": \"name\" }, "
+"{ \"name\": \"address\", \"type\": { "
@@ -621,7 +618,7 @@ public class TestAvroStorage {
" in = LOAD '" + testUserDefinedLoadSchemaFile
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('schema', '" + customSchema + "');",
" o = ORDER in BY name;",
- " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
+ " STORE o INTO '" + output + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();"
};
testAvroStorage(queries);
verifyResults(output, expected);
@@ -1212,30 +1209,65 @@ public class TestAvroStorage {
// {"name":"age","type":["null","int"],"doc":"autogenerated from Pig Field Schema"},
// {"name":"gpa","type":["null","double"],"doc":"autogenerated from Pig Field Schema"}]}]
public void testLoadwithNullValues() throws IOException {
- //Input is supposed to have empty tuples
- PigSchema2Avro.setTupleIndex(0);
- Data data = resetData(pigServerLocal);
- String output = outbasedir + "testLoadwithNulls";
- deleteDirectory(new File(output));
- String [] queries = {
- " A = load '" + testLoadwithNullValues + "' USING " +
- " org.apache.pig.piggybank.storage.avro.AvroStorage(); ",
- " B = order A by name;",
- " store B into '" + output +"' USING mock.Storage();"
- };
- testAvroStorage(queries);
- List<Tuple> out = data.get(output);
- assertEquals(out + " size", 4, out.size());
-
- assertEquals(schema("name:chararray,age:int,gpa:double"), data.getSchema(output));
-
- // sorted data ordered by name
- assertEquals(tuple((String)null),out.get(0));
- assertEquals(tuple((String)null),out.get(1));
- assertEquals(tuple("calvin ellison", 24, 0.71), out.get(2));
- assertEquals(tuple("wendy johnson", 60, 0.07), out.get(3));
+ //Input is supposed to have empty tuples
+ PigSchema2Avro.setTupleIndex(0);
+ Data data = resetData(pigServerLocal);
+ String output = outbasedir + "testLoadwithNulls";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " A = load '" + testLoadwithNullValues + "' USING " +
+ " org.apache.pig.piggybank.storage.avro.AvroStorage(); ",
+ " B = order A by name;",
+ " store B into '" + output +"' USING mock.Storage();"
+ };
+ testAvroStorage(queries);
+ List<Tuple> out = data.get(output);
+ assertEquals(out + " size", 4, out.size());
- }
+ assertEquals(schema("name:chararray,age:int,gpa:double"), data.getSchema(output));
+
+ // sorted data ordered by name
+ assertEquals(tuple((String)null),out.get(0));
+ assertEquals(tuple((String)null),out.get(1));
+ assertEquals(tuple("calvin ellison", 24, 0.71), out.get(2));
+ assertEquals(tuple("wendy johnson", 60, 0.07), out.get(3));
+
+ }
+
+ @Test
+ public void testMultipleLoadStore() throws Exception {
+ PigSchema2Avro.setTupleIndex(0);
+ Data data = resetData(pigServerLocal);
+ data.set("foo",
+ tuple(1, 2, 3),
+ tuple(4, 5, 6),
+ tuple(7, 8, 9));
+ data.set("bar",
+ tuple("a", "b", "c"),
+ tuple("d", "e", "f"),
+ tuple("g", "h", "i"));
+ String output = outbasedir + "testMultipleLoadStore";
+ deleteDirectory(new File(output));
+ String[] storeQuery = {
+ "A = LOAD 'foo' USING " + "mock.Storage() as (a1:int, a2:int, a3:int);",
+ "B = LOAD 'bar' USING " + "mock.Storage() as (b1:chararray, b2:chararray, b3:chararray);",
+ "STORE A into '"+ output +"/A' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+ "STORE B into '"+ output +"/B' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();"
+ };
+ testAvroStorage(storeQuery);
+ String[] loadQuery = {
+ "C = LOAD '"+ output +"/A' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+ "D = LOAD '"+ output +"/B' USING " + "org.apache.pig.piggybank.storage.avro.AvroStorage();",
+ "STORE C into 'foo-actual' USING mock.Storage();",
+ "STORE D into 'bar-actual' USING mock.Storage();"
+ };
+ testAvroStorage(loadQuery);
+
+ assertEquals(data.get("foo"), data.get("foo-actual"));
+ assertEquals(data.get("bar"), data.get("bar-actual"));
+ assertEquals("{a1: int,a2: int,a3: int}", data.getSchema("foo-actual").toString());
+ assertEquals("{b1: chararray,b2: chararray,b3: chararray}", data.getSchema("bar-actual").toString());
+ }
private static void deleteDirectory (File path) {
if ( path.exists()) {
Modified: pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java?rev=1584916&r1=1584915&r2=1584916&view=diff
==============================================================================
--- pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java (original)
+++ pig/branches/branch-0.12/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorageUtils.java Fri Apr 4 21:20:54 2014
@@ -19,15 +19,12 @@ package org.apache.pig.piggybank.test.st
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.piggybank.storage.avro.AvroStorageUtils;
-
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -93,37 +90,45 @@ public class TestAvroStorageUtils {
}
@Test
- public void testGetAllSubDirs() throws IOException {
- final String basedir = System.getProperty("user.dir");
+ public void testGetPaths() throws IOException {
+ final String basedir = "file://" + System.getProperty("user.dir");
final String tempdir = Long.toString(System.currentTimeMillis());
final String nonexistentpath = basedir + "/" + tempdir + "/this_path_does_not_exist";
String locationStr = null;
- Set<Path> paths = new HashSet<Path>();
+ Set<Path> paths;
Configuration conf = new Configuration();
// existent path
locationStr = basedir;
- assertTrue(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+ paths = AvroStorageUtils.getPaths(locationStr, conf, true);
assertFalse(paths.isEmpty());
- paths.clear();
// non-existent path
locationStr = nonexistentpath;
- assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
- assertTrue(paths.isEmpty());
- paths.clear();
+ try {
+ paths = AvroStorageUtils.getPaths(locationStr, conf, true);
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("matches 0 files"));
+ }
// empty glob pattern
locationStr = basedir + "/{}";
- assertFalse(AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths));
+ try {
+ paths = AvroStorageUtils.getPaths(locationStr, conf, true);
+ fail();
+ } catch (IOException e) {
+ assertTrue(e.getMessage().contains("matches 0 files"));
+ }
+
+ paths = AvroStorageUtils.getPaths(locationStr, conf, false);
assertTrue(paths.isEmpty());
- paths.clear();
// bad glob pattern
locationStr = basedir + "/{1,";
try {
- AvroStorageUtils.getAllSubDirs(new Path(locationStr), conf, paths);
+ AvroStorageUtils.getPaths(locationStr, conf, true);
Assert.fail("Negative test to test illegal file pattern. Should not be succeeding!");
} catch (IOException e) {
// The message of the exception for illegal file pattern is rather long,