You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/15 00:26:47 UTC
svn commit: r704724 - in /incubator/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/
src/org/apache/pig/builtin/ src/org/apache/pig/impl/io/
src/org/apache/pig/impl/logicalL...
Author: olga
Date: Tue Oct 14 15:26:46 2008
New Revision: 704724
URL: http://svn.apache.org/viewvc?rev=704724&view=rev
Log:
PIG-468: make determine schema work with binary storage
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Tue Oct 14 15:26:46 2008
@@ -282,3 +282,5 @@
PIG-489: (*) processing (sms via olgan)
PIG-475: missing heartbeats (shravanmn via olgan)
+
+ PIG-468: make determine Schema work for BinStorage (pradeepk via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java Tue Oct 14 15:26:46 2008
@@ -21,6 +21,7 @@
import java.net.URL;
import java.util.Map;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -150,9 +151,18 @@
* (not run time) to see if the loader can provide a schema for the data. The
* loader may be able to do this if the data is self describing (e.g. JSON). If
* the loader cannot determine the schema, it can return a null.
- * @param fileName Name of the file to be read.
+ * LoadFunc implementations which need to open the input "fileName", can use
+ * FileLocalizer.open(String fileName, ExecType execType, DataStorage storage) to get
+ * an InputStream which they can use to initialize their loader implementation. They
+ * can then use this to read the input data to discover the schema. Note: this will
+ * work only when the fileName represents a file on Local File System or Hadoop file
+ * system
+ * @param fileName Name of the file to be read.(this will be the same as the filename
+ * in the "load statement of the script)
+ * @param execType - execution mode of the pig script - one of ExecType.LOCAL or ExecType.MAPREDUCE
+ * @param storage - the DataStorage object corresponding to the execType
* @return a Schema describing the data if possible, or null otherwise.
* @throws IOException.
*/
- public Schema determineSchema(URL fileName) throws IOException;
+ public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Tue Oct 14 15:26:46 2008
@@ -181,7 +181,9 @@
}
DataBag outBag = BagFactory.getInstance().newDefaultBag();
for (Tuple tuple : inpBag) {
- Tuple tmpTuple = tupleFactory.newTuple(tuple.get(columns.get(0)));
+ Tuple tmpTuple = tupleFactory.newTuple();
+ for (int i = 0; i < columns.size(); i++)
+ tmpTuple.append(tuple.get(columns.get(i)));
outBag.add(tmpTuple);
}
res.result = outBag;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Oct 14 15:26:46 2008
@@ -23,18 +23,26 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.Map;
+import org.apache.pig.ExecType;
import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.pig.impl.util.WrappedIOException;
public class BinStorage implements ReversibleLoadStoreFunc {
@@ -199,9 +207,31 @@
}
}
- public Schema determineSchema(URL fileName) throws IOException {
- // TODO Auto-generated method stub
- return null;
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ InputStream is = FileLocalizer.open(fileName, execType, storage);
+ bindTo(fileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+ // get the first record from the input file
+ // and figure out the schema from the data in
+ // the first record
+ Tuple t = getNext();
+ if(t == null) {
+ // we couldn't get a valid record from the input
+ return null;
+ }
+ int numFields = t.size();
+ Schema s = new Schema();
+ for (int i = 0; i < numFields; i++) {
+ try {
+ s.add(DataType.determineFieldSchema(t.get(i)));
+ } catch (Exception e) {
+ throw WrappedIOException.wrap(e);
+ }
+ }
+ return s;
}
public void fieldsToRead(Schema schema) {
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java Tue Oct 14 15:26:46 2008
@@ -22,8 +22,10 @@
import java.io.OutputStream;
import java.net.URL;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -144,13 +146,14 @@
}
/* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#determineSchema(java.net.URL)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
*/
- public Schema determineSchema(URL fileName) throws IOException {
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
// TODO Auto-generated method stub
return null;
}
-
+
/* (non-Javadoc)
* @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
*/
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Oct 14 15:26:46 2008
@@ -27,9 +27,11 @@
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
@@ -217,7 +219,12 @@
mBuf.reset();
}
- public Schema determineSchema(URL fileName) throws IOException {
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
return null;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Oct 14 15:26:46 2008
@@ -24,7 +24,9 @@
import java.nio.charset.Charset;
import java.util.Map;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -144,7 +146,9 @@
/**
* TextLoader does not provide a schema.
*/
- public Schema determineSchema(URL fileName) throws IOException {
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
return null;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue Oct 14 15:26:46 2008
@@ -27,6 +27,7 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.Stack;
import java.util.Properties ;
@@ -177,69 +178,107 @@
throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
}
- ArrayList<ElementDescriptor> arrayList =
- new ArrayList<ElementDescriptor>();
- Iterator<ElementDescriptor> allElements =
- ((ContainerDescriptor)elem).iterator();
-
- while (allElements.hasNext()) {
- ElementDescriptor nextElement = allElements.next();
- if (!nextElement.systemElement()) {
- arrayList.add(nextElement);
- }
- }
-
- elements = new ElementDescriptor[ arrayList.size() ];
- arrayList.toArray(elements);
-
+ // elem is a directory - recursively get all files in it
+ elements = getFileElementDescriptors(elem);
} else {
// It might be a glob
if (!globMatchesFiles(elem, elem.getDataStorage())) {
throw new IOException(elem.toString() + " does not exist");
+ } else {
+ elements = getFileElementDescriptors(elem);
+ return new DataStorageInputStreamIterator(elements);
+
}
}
return new DataStorageInputStreamIterator(elements);
}
- private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
- ElementDescriptor[] elements = null;
-
- if (elem.exists()) {
- try {
- if(! elem.getDataStorage().isContainer(elem.toString())) {
- return elem.open();
+ /**
+ * recursively get all "File" element descriptors present in the input element descriptor
+ * @param elem input element descriptor
+ * @return an array of Element descriptors for files present (found by traversing all levels of dirs)
+ * in the input element descriptor
+ * @throws DataStorageException
+ */
+ private static ElementDescriptor[] getFileElementDescriptors(ElementDescriptor elem) throws DataStorageException {
+ DataStorage store = elem.getDataStorage();
+ ElementDescriptor[] elems = store.asCollection(elem.toString());
+ // elems could have directories in it, if so
+ // get the files out so that it contains only files
+ List<ElementDescriptor> paths = new ArrayList<ElementDescriptor>();
+ List<ElementDescriptor> filePaths = new ArrayList<ElementDescriptor>();
+ for (int m = 0; m < elems.length; m++) {
+ paths.add(elems[m]);
+ }
+ for (int j = 0; j < paths.size(); j++) {
+ ElementDescriptor fullPath = store.asElement(store
+ .getActiveContainer(), paths.get(j));
+ // Skip hadoop's private/meta files ...
+ if (fullPath.systemElement()) {
+ continue;
+ }
+
+ if (fullPath instanceof ContainerDescriptor) {
+ for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
+ paths.add(child);
}
+ continue;
+ } else {
+ // this is a file, add it to filePaths
+ filePaths.add(fullPath);
}
- catch (DataStorageException e) {
- throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
- }
-
- ArrayList<ElementDescriptor> arrayList =
- new ArrayList<ElementDescriptor>();
- Iterator<ElementDescriptor> allElements =
- ((ContainerDescriptor)elem).iterator();
-
- while (allElements.hasNext()) {
- ElementDescriptor ed = allElements.next();
- int li = ed.toString().lastIndexOf(File.separatorChar);
- String fName = ed.toString().substring(li+1);
- if(fName.charAt(0)=='.')
- continue;
- arrayList.add(ed);
- }
-
- elements = new ElementDescriptor[ arrayList.size() ];
- arrayList.toArray(elements);
-
- } else {
- // It might be a glob
- if (!globMatchesFiles(elem, elem.getDataStorage())) {
- throw new IOException(elem.toString() + " does not exist");
+ }
+ elems = new ElementDescriptor[filePaths.size()];
+ filePaths.toArray(elems);
+ return elems;
+ }
+
+ private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
+ // IMPORTANT NOTE: Currently we use HXXX classes to represent
+ // files and dirs in local mode - so we can just delegate this
+ // call to openDFSFile(elem). When we have true local mode files
+ // and dirs THIS WILL NEED TO CHANGE
+ return openDFSFile(elem);
+ }
+
+ /**
+ * This function returns an input stream to a local file system file or
+ * a file residing on Hadoop's DFS
+ * @param fileName The filename to open
+ * @param execType execType indicating whether executing in local mode or MapReduce mode (Hadoop)
+ * @param storage The DataStorage object used to open the fileSpec
+ * @return InputStream to the fileSpec
+ * @throws IOException
+ */
+ static public InputStream open(String fileName, ExecType execType, DataStorage storage) throws IOException {
+ fileName = checkDefaultPrefix(execType, fileName);
+ if (!fileName.startsWith(LOCAL_PREFIX)) {
+ ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
+ return openDFSFile(elem);
+ }
+ else {
+ fileName = fileName.substring(LOCAL_PREFIX.length());
+ ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
+ return openLFSFile(elem);
+ }
+ }
+
+ private static String fullPath(String fileName, DataStorage storage) {
+ String fullPath;
+ try {
+ if (fileName.charAt(0) != '/') {
+ ElementDescriptor currentDir = storage.getActiveContainer();
+ ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
+
+ fullPath = elem.toString();
+ } else {
+ fullPath = fileName;
}
+ } catch (DataStorageException e) {
+ fullPath = fileName;
}
-
- return new DataStorageInputStreamIterator(elements);
+ return fullPath;
}
static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Oct 14 15:26:46 2008
@@ -20,26 +20,35 @@
import java.io.IOException;
import java.net.URL;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import sun.awt.motif.MDataTransferer;
+
public class LOLoad extends LogicalOperator {
private static final long serialVersionUID = 2L;
protected boolean splittable = true;
private FileSpec mInputFileSpec;
private LoadFunc mLoadFunc;
- private URL mSchemaFile;
+ private String mSchemaFile;
private Schema mEnforcedSchema = null ;
+ private DataStorage mStorage;
+ private ExecType mExecType;
private static Log log = LogFactory.getLog(LOLoad.class);
+ private Schema mDeterminedSchema = null;
/**
* @param plan
@@ -53,11 +62,15 @@
*
*/
public LOLoad(LogicalPlan plan, OperatorKey key, FileSpec inputFileSpec,
- URL schemaFile, boolean splittable) throws IOException {
+ ExecType execType, DataStorage storage, boolean splittable) throws IOException {
super(plan, key);
-
mInputFileSpec = inputFileSpec;
- mSchemaFile = schemaFile;
+ //mSchemaFile = schemaFile;
+ // schemaFile is the input file since we are trying
+ // to deduce the schema by looking at the input file
+ mSchemaFile = inputFileSpec.getFileName();
+ mStorage = storage;
+ mExecType = execType;
this.splittable = splittable;
try {
@@ -95,7 +108,7 @@
mInputFileSpec = inputFileSpec;
}
- public URL getSchemaFile() {
+ public String getSchemaFile() {
return mSchemaFile;
}
@@ -121,11 +134,13 @@
return mSchema ;
}
- if(null != mSchemaFile) {
- mSchema = mLoadFunc.determineSchema(mSchemaFile);
+ if(null == mDeterminedSchema) {
+ mSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage);
+ mDeterminedSchema = mSchema;
}
mIsSchemaComputed = true;
} catch (IOException ioe) {
+ ioe.printStackTrace();
FrontendException fee = new FrontendException(ioe.getMessage());
fee.initCause(ioe);
mIsSchemaComputed = false;
@@ -135,6 +150,35 @@
}
return mSchema;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.logicalLayer.LogicalOperator#setSchema(org.apache.pig.impl.logicalLayer.schema.Schema)
+ */
+ @Override
+ public void setSchema(Schema schema) throws ParseException {
+ // In general, operators don't generate their schema until they're
+ // asked, so ask them to do it.
+ try {
+ getSchema();
+ } catch (FrontendException ioe) {
+ // It's fine, it just means we don't have a schema yet.
+ }
+ if (mSchema == null) {
+ log.debug("Operator schema is null; Setting it to new schema");
+ mSchema = schema;
+ } else {
+ log.debug("Reconciling schema");
+ log.debug("mSchema: " + mSchema + " schema: " + schema);
+ try {
+ mSchema = mSchema.mergePrefixSchema(schema, true, true);
+ } catch (SchemaMergeException e) {
+ ParseException pe = new ParseException("Unable to merge schemas");
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+ }
+
@Override
public boolean supportsMultipleInputs() {
@@ -166,4 +210,11 @@
return DataType.BAG ;
}
+ /**
+ * @return the DeterminedSchema
+ */
+ public Schema getDeterminedSchema() {
+ return mDeterminedSchema;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Oct 14 15:26:46 2008
@@ -120,7 +120,7 @@
* if there is already a schema and the existing schema cannot
* be reconciled with this new schema.
*/
- public final void setSchema(Schema schema) throws ParseException {
+ public void setSchema(Schema schema) throws ParseException {
// In general, operators don't generate their schema until they're
// asked, so ask them to do it.
try {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Tue Oct 14 15:26:46 2008
@@ -32,6 +32,7 @@
import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -63,22 +64,35 @@
@Override
public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
try {
- Schema s = getOperator(nodes).getSchema();
+ LogicalOperator op = getOperator(nodes);
+ Schema s = op.getSchema();
if (s == null) return false;
boolean sawOne = false;
List<Schema.FieldSchema> fss = s.getFields();
List<Byte> types = new ArrayList<Byte>(s.size());
- for (Schema.FieldSchema fs : fss) {
- if (fs.type != DataType.BYTEARRAY) sawOne = true;
- types.add(fs.type);
+ Schema determinedSchema = null;
+ if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+ determinedSchema = ((LOLoad)op).getDeterminedSchema();
+ }
+ for (int i = 0; i < fss.size(); i++) {
+ if (fss.get(i).type != DataType.BYTEARRAY) {
+ if(determinedSchema == null ||
+ (fss.get(i).type != determinedSchema.getField(i).type)) {
+ // Either no schema was determined by loader OR the type
+ // from the "determinedSchema" is different
+ // from the type specified - so we need to cast
+ sawOne = true;
+ }
+ }
+ types.add(fss.get(i).type);
}
// If all we've found are byte arrays, we don't need a projection.
return sawOne;
- } catch (FrontendException fe) {
+ } catch (Exception e) {
throw new OptimizerException("Caught exception while trying to " +
- " check if type casts are needed", fe);
+ " check if type casts are needed", e);
}
}
@@ -117,6 +131,14 @@
ArrayList<LogicalPlan> genPlans = new ArrayList<LogicalPlan>(s.size());
ArrayList<Boolean> flattens = new ArrayList<Boolean>(s.size());
Map<String, Byte> typeChanges = new HashMap<String, Byte>();
+ // if we are inserting casts in a load and if the loader
+ // implements determineSchema(), insert casts only where necessary
+ // Note that in this case, the data coming out of the loader is not
+ // a BYTEARRAY but is whatever determineSchema() says it is.
+ Schema determinedSchema = null;
+ if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+ determinedSchema = ((LOLoad)lo).getDeterminedSchema();
+ }
for (int i = 0; i < s.size(); i++) {
LogicalPlan p = new LogicalPlan();
genPlans.add(p);
@@ -128,27 +150,37 @@
p.add(proj);
Schema.FieldSchema fs = s.getField(i);
if (fs.type != DataType.BYTEARRAY) {
- LOCast cast = new LOCast(p, OperatorKey.genOpKey(scope),
- proj, fs.type);
- p.add(cast);
- p.connect(proj, cast);
-
- cast.setFieldSchema(fs.clone());
- LoadFunc loadFunc = null;
- if(lo instanceof LOLoad) {
- loadFunc = ((LOLoad)lo).getLoadFunc();
- } else if (lo instanceof LOStream) {
- StreamingCommand command = ((LOStream)lo).getStreamingCommand();
- HandleSpec streamOutputSpec = command.getOutputSpec();
- loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
- } else {
- throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
- }
- cast.setLoadFunc(loadFunc);
- typeChanges.put(fs.canonicalName, fs.type);
- // Reset the loads field schema to byte array so that it
- // will reflect reality.
- fs.type = DataType.BYTEARRAY;
+ if(determinedSchema == null || (fs.type != determinedSchema.getField(i).type)) {
+ // Either no schema was determined by loader OR the type
+ // from the "determinedSchema" is different
+ // from the type specified - so we need to cast
+ LOCast cast = new LOCast(p, OperatorKey.genOpKey(scope),
+ proj, fs.type);
+ p.add(cast);
+ p.connect(proj, cast);
+
+ cast.setFieldSchema(fs.clone());
+ LoadFunc loadFunc = null;
+ if(lo instanceof LOLoad) {
+ loadFunc = ((LOLoad)lo).getLoadFunc();
+ } else if (lo instanceof LOStream) {
+ StreamingCommand command = ((LOStream)lo).getStreamingCommand();
+ HandleSpec streamOutputSpec = command.getOutputSpec();
+ loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+ } else {
+ throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
+ }
+ cast.setLoadFunc(loadFunc);
+ typeChanges.put(fs.canonicalName, fs.type);
+ if(determinedSchema == null) {
+ // Reset the loads field schema to byte array so that it
+ // will reflect reality.
+ fs.type = DataType.BYTEARRAY;
+ } else {
+ // Reset the type to what determinedSchema says it is
+ fs.type = determinedSchema.getField(i).type;
+ }
+ }
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Oct 14 15:26:46 2008
@@ -1013,7 +1013,8 @@
}
try {
- lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null, splittable);
+ lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec),
+ pigContext.getExecType(), pigContext.getFs(), splittable);
} catch (IOException ioe) {
// The autogenerated parser code only catches RuntimeException and
// ParseException as special Exceptions. All others are caught as
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Oct 14 15:26:46 2008
@@ -428,22 +428,46 @@
*/
public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException {
- return mergePrefixFieldSchema(otherFs, true);
+ return mergePrefixFieldSchema(otherFs, true, false);
}
/***
+ * Recursively prefix merge two schemas
+ * @param otherFs the other field schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * field schema take precedence
+ * @return the prefix merged field schema this can be null if one schema is null and
+ * allowIncompatibleTypes is true
+ *
+ * @throws SchemaMergeException if they cannot be merged
+ */
+
+ public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
+ boolean otherTakesAliasPrecedence)
+ throws SchemaMergeException {
+ return mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, false);
+ }
+
+ /***
* Recursively prefix merge two schemas
* @param otherFs the other field schema to be merged with
* @param otherTakesAliasPrecedence true if aliases from the other
* field schema take precedence
- * @return the prefix merged field schema this can be null if one schema is null and
- * allowIncompatibleTypes is true
+ * @param allowMergeableTypes true if "mergeable" types should be allowed.
+ * Two types are mergeable if any of the following conditions is true IN THE
+ * BELOW ORDER of checks:
+ * 1) if either one has a type null or unknown and other has a type OTHER THAN
+ * null or unknown, the result type will be the latter non null/unknown type
+ * 2) If either type is bytearray, then result type will be the other (possibly non BYTEARRAY) type
+ * 3) If current type can be casted to the other type, then the result type will be the
+ * other type
+ * @return the prefix merged field schema this can be null.
*
* @throws SchemaMergeException if they cannot be merged
*/
public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
- boolean otherTakesAliasPrecedence)
+ boolean otherTakesAliasPrecedence, boolean allowMergeableTypes)
throws SchemaMergeException {
Schema.FieldSchema myFs = this;
Schema.FieldSchema mergedFs = null;
@@ -453,19 +477,29 @@
return myFs;
}
- if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
- && (otherFs.type == DataType.NULL || otherFs.type == DataType.UNKNOWN)) {
+ if(isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) {
throw new SchemaMergeException("Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
} else if(myFs.type == otherFs.type) {
mergedType = myFs.type;
- } else if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
- && (otherFs.type == DataType.NULL)) {
- mergedType = DataType.BYTEARRAY;
- } else if ((myFs.type != DataType.NULL && myFs.type != DataType.UNKNOWN)
- && (otherFs.type == DataType.NULL)) {
+ } else if (!isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) {
mergedType = myFs.type;
} else {
- throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+ if (allowMergeableTypes) {
+ if (isNullOrUnknownType(myFs) && !isNullOrUnknownType(otherFs)) {
+ mergedType = otherFs.type;
+ } else if(otherFs.type == DataType.BYTEARRAY) {
+ // just set mergeType to myFs's type (could even be BYTEARRAY)
+ mergedType = myFs.type;
+ } else {
+ if(castable(otherFs, myFs)) {
+ mergedType = otherFs.type;
+ } else {
+ throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+ }
+ }
+ } else {
+ throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+ }
}
String mergedAlias = mergeAlias(myFs.alias,
@@ -481,7 +515,7 @@
// merge inner schemas because both sides have schemas
if(null != myFs.schema) {
mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema,
- otherTakesAliasPrecedence);
+ otherTakesAliasPrecedence, allowMergeableTypes);
} else {
mergedSubSchema = otherFs.schema;
}
@@ -494,6 +528,10 @@
}
return mergedFs;
}
+
+ private boolean isNullOrUnknownType(FieldSchema fs) {
+ return (fs.type == DataType.NULL || fs.type == DataType.UNKNOWN);
+ }
}
@@ -1307,6 +1345,31 @@
public Schema mergePrefixSchema(Schema other,
boolean otherTakesAliasPrecedence)
throws SchemaMergeException {
+ return mergePrefixSchema(other, otherTakesAliasPrecedence, false);
+ }
+
+ /***
+ * Recursively prefix merge two schemas
+ * @param other the other schema to be merged with
+ * @param otherTakesAliasPrecedence true if aliases from the other
+ * schema take precedence
+ * @param allowMergeableTypes true if "mergeable" types should be allowed.
+ * Two types are mergeable if any of the following conditions is true IN THE
+ * BELOW ORDER of checks:
+ * 1) if either one has a type null or unknown and other has a type OTHER THAN
+ * null or unknown, the result type will be the latter non null/unknown type
+ * 2) If either type is bytearray, then result type will be the other (possibly non BYTEARRAY) type
+ * 3) If current type can be casted to the other type, then the result type will be the
+ * other type
+ * @return the prefix merged schema this can be null if one schema is null and
+ * allowIncompatibleTypes is true
+ *
+ * @throws SchemaMergeException if they cannot be merged
+ */
+
+ public Schema mergePrefixSchema(Schema other,
+ boolean otherTakesAliasPrecedence, boolean allowMergeableTypes)
+ throws SchemaMergeException {
Schema schema = this;
if (other == null) {
@@ -1332,7 +1395,7 @@
FieldSchema myFs = mylist.get(idx) ;
FieldSchema otherFs = otherlist.get(idx) ;
- FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence);
+ FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, allowMergeableTypes);
outputList.add(mergedFs) ;
}
// if the first schema has leftover, then append the rest
Modified: incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java Tue Oct 14 15:26:46 2008
@@ -22,6 +22,7 @@
import java.net.URL;
import java.util.Map;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
import org.apache.pig.Slice;
import org.apache.pig.Slicer;
@@ -193,9 +194,11 @@
}
/* (non-Javadoc)
- * @see org.apache.pig.LoadFunc#determineSchema(java.net.URL)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
*/
- public Schema determineSchema(URL fileName) throws IOException {
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
return null;
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Tue Oct 14 15:26:46 2008
@@ -524,5 +524,125 @@
assertEquals("value2", t.get(4).toString());
}
+
+
+ @Test
+ public void testBinStorageDetermineSchema() throws IOException, ExecException {
+ // Create input file with ascii data
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
+
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
+ pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
+ Iterator<Tuple> it = pigServer.openIterator("b");
+ Tuple t = it.next();
+ assertEquals(new Long(2), t.get(0));
+ assertEquals(1, t.get(1));
+ assertEquals(2, t.get(2));
+ assertEquals("value1", t.get(3).toString());
+ assertEquals("value2", t.get(4).toString());
+
+ //test with BinStorage
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+ "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
+ String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
+ pigServer.deleteFile(output);
+ pigServer.store("a", output, BinStorage.class.getName());
+ // test with different load specifications
+ String[] loads = {"p = load '" + output +"' using BinStorage() " +
+ "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);",
+ "p = load '" + output +"' using BinStorage() " +
+ "as (b, t2, m);",
+ "p = load '" + output +"' using BinStorage() ;"};
+ // the corresponding generate statements
+ String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2', b;",
+ "q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
+ "q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
+
+ for (int i = 0; i < loads.length; i++) {
+ pigServer.registerQuery(loads[i]);
+ pigServer.registerQuery(generates[i]);
+ it = pigServer.openIterator("q");
+ t = it.next();
+ assertEquals(new Long(2), t.get(0));
+ assertEquals(Integer.class, t.get(1).getClass());
+ assertEquals(1, t.get(1));
+ assertEquals(Integer.class, t.get(2).getClass());
+ assertEquals(2, t.get(2));
+ assertEquals("value1", t.get(3).toString());
+ assertEquals("value2", t.get(4).toString());
+ assertEquals(DefaultDataBag.class, t.get(5).getClass());
+ DataBag bg = (DataBag)t.get(5);
+ for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
+ Tuple bt = bit.next();
+ assertEquals(String.class, bt.get(0).getClass());
+ assertEquals(String.class, bt.get(1).getClass());
+ }
+ }
+ }
+
+ @Test
+ public void testBinStorageDetermineSchema2() throws IOException, ExecException {
+ // Create input file with ascii data
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"pigtester\t10\t1.2"});
+
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+ "as (name:chararray, age:int, gpa:double);");
+ String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
+ pigServer.deleteFile(output);
+ pigServer.store("a", output, BinStorage.class.getName());
+ // test with different load specifications
+ String[] loads = {"p = load '" + output +"' using BinStorage() " +
+ "as (name:chararray, age:int, gpa:double);",
+ "p = load '" + output +"' using BinStorage() " +
+ "as (name, age, gpa);",
+ "p = load '" + output +"' using BinStorage() ;"};
+ // the corresponding generate statements
+ String[] generates = {"q = foreach p generate name, age, gpa;",
+ "q = foreach p generate name, age, gpa;",
+ "q = foreach p generate $0, $1, $2;"};
+
+ for (int i = 0; i < loads.length; i++) {
+ pigServer.registerQuery(loads[i]);
+ pigServer.registerQuery(generates[i]);
+ Iterator<Tuple> it = pigServer.openIterator("q");
+ Tuple t = it.next();
+ assertEquals("pigtester", t.get(0));
+ assertEquals(String.class, t.get(0).getClass());
+ assertEquals(10, t.get(1));
+ assertEquals(Integer.class, t.get(1).getClass());
+ assertEquals(1.2, t.get(2));
+ assertEquals(Double.class, t.get(2).getClass());
+ }
+
+ // test that valid casting is allowed
+ pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
+ " as (name, age:long, gpa:float);");
+ pigServer.registerQuery("q = foreach p generate name, age, gpa;");
+ Iterator<Tuple> it = pigServer.openIterator("q");
+ Tuple t = it.next();
+ assertEquals("pigtester", t.get(0));
+ assertEquals(String.class, t.get(0).getClass());
+ assertEquals(10L, t.get(1));
+ assertEquals(Long.class, t.get(1).getClass());
+ assertEquals(1.2f, t.get(2));
+ assertEquals(Float.class, t.get(2).getClass());
+
+ // test that implicit casts work
+ pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
+ " as (name, age, gpa);");
+ pigServer.registerQuery("q = foreach p generate name, age + 1L, (int)gpa;");
+ it = pigServer.openIterator("q");
+ t = it.next();
+ assertEquals("pigtester", t.get(0));
+ assertEquals(String.class, t.get(0).getClass());
+ assertEquals(11L, t.get(1));
+ assertEquals(Long.class, t.get(1).getClass());
+ assertEquals(1, t.get(2));
+ assertEquals(Integer.class, t.get(2).getClass());
+ }
+
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java Tue Oct 14 15:26:46 2008
@@ -115,7 +115,7 @@
new FileSpec(inputFile, new FuncSpec("org.apache.pig.builtin.PigStorage")) ;
FileSpec filespec2 =
new FileSpec(outputFile, new FuncSpec("org.apache.pig.builtin.PigStorage"));
- LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1, null, true) ;
+ LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1, null, null, true) ;
LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2) ;
plan.add(load) ;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Oct 14 15:26:46 2008
@@ -38,6 +38,7 @@
import org.apache.pig.LoadFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigServer;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
@@ -47,14 +48,7 @@
import org.apache.pig.impl.builtin.GFAny;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
@@ -395,10 +389,6 @@
return null;
}
- public Schema determineSchema(URL filename) {
- return null;
- }
-
public void fieldsToRead(Schema schema) {
}
@@ -470,6 +460,15 @@
public byte[] toBytes(Tuple t) throws IOException {
return null;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Oct 14 15:26:46 2008
@@ -55,6 +55,7 @@
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.Before;
import org.apache.pig.test.utils.TestHelper;
@@ -245,10 +246,6 @@
public void fieldsToRead(Schema schema) {}
- public Schema determineSchema(URL fileName) throws IOException {
- return null;
- }
-
public byte[] toBytes(DataBag bag) throws IOException {
return null;
}
@@ -281,6 +278,15 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Tue Oct 14 15:26:46 2008
@@ -24,7 +24,9 @@
import java.util.Map;
import java.util.Random;
+import org.apache.pig.ExecType;
import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -627,10 +629,6 @@
return null;
}
- public Schema determineSchema(URL filename) {
- return null;
- }
-
public void fieldsToRead(Schema schema) {
}
@@ -707,6 +705,15 @@
public byte[] toBytes(Tuple t) throws IOException {
return null;
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+ */
+ public Schema determineSchema(String fileName, ExecType execType,
+ DataStorage storage) throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
@Test
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Oct 14 15:26:46 2008
@@ -18,6 +18,8 @@
package org.apache.pig.test;
+import java.io.File;
+import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
@@ -49,8 +51,19 @@
simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
else
simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ File fileA = new File("a");
+ File fileB = new File("b");
+ // Create file if it does not exist
+ try {
+ if(!fileA.createNewFile() || !fileB.createNewFile())
+ fail("Unable to create input files");
+ } catch (IOException e) {
+ fail("Unable to create input files:" + e.getMessage());
+ }
+ fileA.deleteOnExit();
+ fileB.deleteOnExit();
}
-
+
@Test
public void testExpressionTypeChecking1() throws Throwable {
LogicalPlan plan = new LogicalPlan() ;
@@ -907,11 +920,11 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
LOLoad load2 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1031,11 +1044,11 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
LOLoad load2 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1203,7 +1216,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1271,7 +1284,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1341,7 +1354,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1411,7 +1424,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1491,7 +1504,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1589,7 +1602,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1684,7 +1697,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1812,7 +1825,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -1916,11 +1929,11 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
LOLoad load2 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -2087,11 +2100,11 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
LOLoad load2 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
@@ -2226,11 +2239,11 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
LOLoad load2 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// schema for input#1
Schema inputSchema1 = null ;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java Tue Oct 14 15:26:46 2008
@@ -445,7 +445,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// set schemas
load1.setEnforcedSchema(null) ;
@@ -511,7 +511,7 @@
LOLoad load1 = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
// set schemas
load1.setEnforcedSchema(null) ;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java Tue Oct 14 15:26:46 2008
@@ -18,6 +18,7 @@
package org.apache.pig.test.utils;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
@@ -193,6 +194,11 @@
LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+ try {
+ pigContext.connect();
+ } catch (ExecException e1) {
+ fail(e1.getClass().getName() + ": " + e1.getMessage() + " -- " + query);
+ }
LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
try {
@@ -222,6 +228,7 @@
fail("IOException: " + e.getMessage());
}
catch (Exception e) {
+ e.printStackTrace();
fail(e.getClass().getName() + ": " + e.getMessage() + " -- " + query);
}
return null;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java Tue Oct 14 15:26:46 2008
@@ -42,7 +42,7 @@
LOLoad load = new LOLoad(plan,
genNewOperatorKey(),
new FileSpec("pi", new FuncSpec(pigStorage)),
- null, true) ;
+ null, null, true) ;
return load ;
} catch (IOException e) {
throw new AssertionError("This cannot happen") ;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Tue Oct 14 15:26:46 2008
@@ -94,7 +94,7 @@
FileSpec fileSpec = new FileSpec("pi",
new FuncSpec(PigStorage.class.getName())) ;
try {
- load = new LOLoad(plan, getKey(node.attributes), fileSpec, null, true) ;
+ load = new LOLoad(plan, getKey(node.attributes), fileSpec, null, null, true) ;
fillSchema(load, node.attributes) ;
}
catch (IOException ioe) {