You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/10/15 00:26:47 UTC

svn commit: r704724 - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/ src/org/apache/pig/builtin/ src/org/apache/pig/impl/io/ src/org/apache/pig/impl/logicalL...

Author: olga
Date: Tue Oct 14 15:26:46 2008
New Revision: 704724

URL: http://svn.apache.org/viewvc?rev=704724&view=rev
Log:
PIG-468: make determine schema work with binary storage

Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
    incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Tue Oct 14 15:26:46 2008
@@ -282,3 +282,5 @@
     PIG-489: (*) processing (sms via olgan)
 
     PIG-475: missing heartbeats (shravanmn via olgan)
+
+    PIG-468: make determine Schema work for BinStorage (pradeepk via olgan)

Modified: incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java Tue Oct 14 15:26:46 2008
@@ -21,6 +21,7 @@
 import java.net.URL;
 import java.util.Map;
 
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -150,9 +151,18 @@
      * (not run time) to see if the loader can provide a schema for the data.  The
      * loader may be able to do this if the data is self describing (e.g. JSON).  If
      * the loader cannot determine the schema, it can return a null.
-     * @param fileName Name of the file to be read.
+     * LoadFunc implementations which need to open the input "fileName", can use 
+     * FileLocalizer.open(String fileName, ExecType execType, DataStorage storage) to get
+     * an InputStream which they can use to initialize their loader implementation. They
+     * can then use this to read the input data to discover the schema. Note: this will
+     * work only when the fileName represents a file on Local File System or Hadoop file 
+     * system
+     * @param fileName Name of the file to be read.(this will be the same as the filename 
+     * in the "load statement of the script)
+     * @param execType - execution mode of the pig script - one of ExecType.LOCAL or ExecType.MAPREDUCE
+     * @param storage - the DataStorage object corresponding to the execType
      * @return a Schema describing the data if possible, or null otherwise.
      * @throws IOException.
      */
-    public Schema determineSchema(URL fileName) throws IOException;
+    public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException;
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Tue Oct 14 15:26:46 2008
@@ -181,7 +181,9 @@
         }
         DataBag outBag = BagFactory.getInstance().newDefaultBag();
         for (Tuple tuple : inpBag) {
-            Tuple tmpTuple = tupleFactory.newTuple(tuple.get(columns.get(0)));
+            Tuple tmpTuple = tupleFactory.newTuple();
+            for (int i = 0; i < columns.size(); i++)
+                tmpTuple.append(tuple.get(columns.get(i)));
             outBag.add(tmpTuple);
         }
         res.result = outBag;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Oct 14 15:26:46 2008
@@ -23,18 +23,26 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URL;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataReaderWriter;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
+import org.apache.pig.impl.util.WrappedIOException;
 
 
 public class BinStorage implements ReversibleLoadStoreFunc {
@@ -199,9 +207,31 @@
         }
     }
 
-    public Schema determineSchema(URL fileName) throws IOException {
-        // TODO Auto-generated method stub
-        return null;
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+     */
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        InputStream is = FileLocalizer.open(fileName, execType, storage);
+        bindTo(fileName, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
+        // get the first record from the input file
+        // and figure out the schema from the data in
+        // the first record
+        Tuple t = getNext();
+        if(t == null) {
+            // we couldn't get a valid record from the input
+            return null;
+        }
+        int numFields = t.size();
+        Schema s = new Schema();
+        for (int i = 0; i < numFields; i++) {
+            try {
+                s.add(DataType.determineFieldSchema(t.get(i)));
+            } catch (Exception e) {
+                throw WrappedIOException.wrap(e);
+            } 
+        }
+        return s;
     }
 
     public void fieldsToRead(Schema schema) {

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinaryStorage.java Tue Oct 14 15:26:46 2008
@@ -22,8 +22,10 @@
 import java.io.OutputStream;
 import java.net.URL;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -144,13 +146,14 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#determineSchema(java.net.URL)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
      */
-    public Schema determineSchema(URL fileName) throws IOException {
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
         // TODO Auto-generated method stub
         return null;
     }
-
+    
     /* (non-Javadoc)
      * @see org.apache.pig.LoadFunc#fieldsToRead(org.apache.pig.impl.logicalLayer.schema.Schema)
      */

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Oct 14 15:26:46 2008
@@ -27,9 +27,11 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;
 import org.apache.pig.ReversibleLoadStoreFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -217,7 +219,12 @@
         mBuf.reset();
     }
 
-    public Schema determineSchema(URL fileName) throws IOException {
+    /* (non-Javadoc)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+     */
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        // TODO Auto-generated method stub
         return null;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Oct 14 15:26:46 2008
@@ -24,7 +24,9 @@
 import java.nio.charset.Charset;
 import java.util.Map;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -144,7 +146,9 @@
     /**
      * TextLoader does not provide a schema.
      */
-    public Schema determineSchema(URL fileName) throws IOException {
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        // TODO Auto-generated method stub
         return null;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Tue Oct 14 15:26:46 2008
@@ -27,6 +27,7 @@
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.Stack;
 import java.util.Properties ;
@@ -177,69 +178,107 @@
                 throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
             }
             
-            ArrayList<ElementDescriptor> arrayList = 
-                new ArrayList<ElementDescriptor>();
-            Iterator<ElementDescriptor> allElements = 
-                ((ContainerDescriptor)elem).iterator();
-            
-            while (allElements.hasNext()) {
-                ElementDescriptor nextElement = allElements.next();
-                if (!nextElement.systemElement()) {
-                    arrayList.add(nextElement);
-                }
-            }
-            
-            elements = new ElementDescriptor[ arrayList.size() ];
-            arrayList.toArray(elements);
-        
+            // elem is a directory - recursively get all files in it
+            elements = getFileElementDescriptors(elem);
         } else {
             // It might be a glob
             if (!globMatchesFiles(elem, elem.getDataStorage())) {
                 throw new IOException(elem.toString() + " does not exist");
+            } else {
+                elements = getFileElementDescriptors(elem); 
+                return new DataStorageInputStreamIterator(elements);
+                
             }
         }
         
         return new DataStorageInputStreamIterator(elements);
     }
     
-    private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
-        ElementDescriptor[] elements = null;
-        
-        if (elem.exists()) {
-            try {
-                if(! elem.getDataStorage().isContainer(elem.toString())) {
-                    return elem.open();
+    /**
+     * recursively get all "File" element descriptors present in the input element descriptor
+     * @param elem input element descriptor
+     * @return an array of Element descriptors for files present (found by traversing all levels of dirs)
+     *  in the input element descriptor
+     * @throws DataStorageException
+     */
+    private static ElementDescriptor[] getFileElementDescriptors(ElementDescriptor elem) throws DataStorageException {
+        DataStorage store = elem.getDataStorage();
+        ElementDescriptor[] elems = store.asCollection(elem.toString());
+        // elems could have directories in it, if so
+        // get the files out so that it contains only files
+        List<ElementDescriptor> paths = new ArrayList<ElementDescriptor>();
+        List<ElementDescriptor> filePaths = new ArrayList<ElementDescriptor>();
+        for (int m = 0; m < elems.length; m++) {
+            paths.add(elems[m]);
+        }
+        for (int j = 0; j < paths.size(); j++) {
+            ElementDescriptor fullPath = store.asElement(store
+                    .getActiveContainer(), paths.get(j));
+            // Skip hadoop's private/meta files ...
+            if (fullPath.systemElement()) {
+                continue;
+            }
+            
+            if (fullPath instanceof ContainerDescriptor) {
+                for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
+                    paths.add(child);
                 }
+                continue;
+            } else {
+                // this is a file, add it to filePaths
+                filePaths.add(fullPath);
             }
-            catch (DataStorageException e) {
-                throw WrappedIOException.wrap("Failed to determine if elem=" + elem + " is container", e);
-            }
-            
-            ArrayList<ElementDescriptor> arrayList = 
-                new ArrayList<ElementDescriptor>();
-            Iterator<ElementDescriptor> allElements = 
-                ((ContainerDescriptor)elem).iterator();
-            
-            while (allElements.hasNext()) {
-                ElementDescriptor ed = allElements.next();
-                int li = ed.toString().lastIndexOf(File.separatorChar);
-                String fName = ed.toString().substring(li+1);
-                if(fName.charAt(0)=='.')
-                    continue;
-                arrayList.add(ed);
-            }
-            
-            elements = new ElementDescriptor[ arrayList.size() ];
-            arrayList.toArray(elements);
-        
-        } else {
-            // It might be a glob
-            if (!globMatchesFiles(elem, elem.getDataStorage())) {
-                throw new IOException(elem.toString() + " does not exist");
+        }
+        elems = new ElementDescriptor[filePaths.size()];
+        filePaths.toArray(elems);
+        return elems;
+    }
+    
+    private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
+        // IMPORTANT NOTE: Currently we use HXXX classes to represent
+        // files and dirs in local mode - so we can just delegate this
+        // call to openDFSFile(elem). When we have true local mode files
+        // and dirs THIS WILL NEED TO CHANGE
+        return openDFSFile(elem);
+    }
+    
+    /**
+     * This function returns an input stream to a local file system file or
+     * a file residing on Hadoop's DFS
+     * @param fileName The filename to open
+     * @param execType execType indicating whether executing in local mode or MapReduce mode (Hadoop)
+     * @param storage The DataStorage object used to open the fileSpec
+     * @return InputStream to the fileSpec
+     * @throws IOException
+     */
+    static public InputStream open(String fileName, ExecType execType, DataStorage storage) throws IOException {
+        fileName = checkDefaultPrefix(execType, fileName);
+        if (!fileName.startsWith(LOCAL_PREFIX)) {
+            ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
+            return openDFSFile(elem);
+        }
+        else {
+            fileName = fileName.substring(LOCAL_PREFIX.length());
+            ElementDescriptor elem = storage.asElement(fullPath(fileName, storage));
+            return openLFSFile(elem);
+        }
+    }
+    
+    private static String fullPath(String fileName, DataStorage storage) {
+        String fullPath;
+        try {
+            if (fileName.charAt(0) != '/') {
+                ElementDescriptor currentDir = storage.getActiveContainer();
+                ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
+                
+                fullPath = elem.toString();
+            } else {
+                fullPath = fileName;
             }
+        } catch (DataStorageException e) {
+            fullPath = fileName;
         }
-        
-        return new DataStorageInputStreamIterator(elements);
+        return fullPath;
     }
     
     static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Oct 14 15:26:46 2008
@@ -20,26 +20,35 @@
 import java.io.IOException;
 import java.net.URL;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import sun.awt.motif.MDataTransferer;
+
 public class LOLoad extends LogicalOperator {
     private static final long serialVersionUID = 2L;
     protected boolean splittable = true;
 
     private FileSpec mInputFileSpec;
     private LoadFunc mLoadFunc;
-    private URL mSchemaFile;
+    private String mSchemaFile;
     private Schema mEnforcedSchema = null ;
+    private DataStorage mStorage;
+    private ExecType mExecType;
     private static Log log = LogFactory.getLog(LOLoad.class);
+    private Schema mDeterminedSchema = null;
 
     /**
      * @param plan
@@ -53,11 +62,15 @@
      * 
      */
     public LOLoad(LogicalPlan plan, OperatorKey key, FileSpec inputFileSpec,
-            URL schemaFile, boolean splittable) throws IOException {
+            ExecType execType, DataStorage storage, boolean splittable) throws IOException {
         super(plan, key);
-
         mInputFileSpec = inputFileSpec;
-        mSchemaFile = schemaFile;
+        //mSchemaFile = schemaFile;
+        // schemaFile is the input file since we are trying
+        // to deduce the schema by looking at the input file
+        mSchemaFile = inputFileSpec.getFileName();
+        mStorage = storage;
+        mExecType = execType;
         this.splittable = splittable;
 
          try { 
@@ -95,7 +108,7 @@
         mInputFileSpec = inputFileSpec;
     }
 
-    public URL getSchemaFile() {
+    public String getSchemaFile() {
         return mSchemaFile;
     }
 
@@ -121,11 +134,13 @@
                     return mSchema ;
                 }
 
-                if(null != mSchemaFile) {
-                    mSchema = mLoadFunc.determineSchema(mSchemaFile);
+                if(null == mDeterminedSchema) {
+                    mSchema = mLoadFunc.determineSchema(mSchemaFile, mExecType, mStorage);
+                    mDeterminedSchema  = mSchema;
                 }
                 mIsSchemaComputed = true;
             } catch (IOException ioe) {
+                ioe.printStackTrace();
                 FrontendException fee = new FrontendException(ioe.getMessage());
                 fee.initCause(ioe);
                 mIsSchemaComputed = false;
@@ -135,6 +150,35 @@
         }
         return mSchema;
     }
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LogicalOperator#setSchema(org.apache.pig.impl.logicalLayer.schema.Schema)
+     */
+    @Override
+    public void setSchema(Schema schema) throws ParseException {
+        // In general, operators don't generate their schema until they're
+        // asked, so ask them to do it.
+        try {
+            getSchema();
+        } catch (FrontendException ioe) {
+            // It's fine, it just means we don't have a schema yet.
+        }
+        if (mSchema == null) {
+            log.debug("Operator schema is null; Setting it to new schema");
+            mSchema = schema;
+        } else {
+            log.debug("Reconciling schema");
+            log.debug("mSchema: " + mSchema + " schema: " + schema);
+            try {
+                mSchema = mSchema.mergePrefixSchema(schema, true, true);
+            } catch (SchemaMergeException e) {
+                ParseException pe = new ParseException("Unable to merge schemas");
+                pe.initCause(e);
+                throw pe;
+            }
+        }
+    }
+    
 
     @Override
     public boolean supportsMultipleInputs() {
@@ -166,4 +210,11 @@
         return DataType.BAG ;
     }
 
+    /**
+     * @return the DeterminedSchema
+     */
+    public Schema getDeterminedSchema() {
+        return mDeterminedSchema;
+    }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Tue Oct 14 15:26:46 2008
@@ -120,7 +120,7 @@
      *             if there is already a schema and the existing schema cannot
      *             be reconciled with this new schema.
      */
-    public final void setSchema(Schema schema) throws ParseException {
+    public void setSchema(Schema schema) throws ParseException {
         // In general, operators don't generate their schema until they're
         // asked, so ask them to do it.
         try {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Tue Oct 14 15:26:46 2008
@@ -32,6 +32,7 @@
 import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -63,22 +64,35 @@
     @Override
     public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
         try {
-            Schema s = getOperator(nodes).getSchema();
+            LogicalOperator op = getOperator(nodes);
+            Schema s = op.getSchema();
             if (s == null) return false;
     
             boolean sawOne = false;
             List<Schema.FieldSchema> fss = s.getFields();
             List<Byte> types = new ArrayList<Byte>(s.size());
-            for (Schema.FieldSchema fs : fss) {
-                if (fs.type != DataType.BYTEARRAY) sawOne = true;
-                types.add(fs.type);
+            Schema determinedSchema = null;
+            if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+                determinedSchema = ((LOLoad)op).getDeterminedSchema();
+            }
+            for (int i = 0; i < fss.size(); i++) {
+                if (fss.get(i).type != DataType.BYTEARRAY) {
+                    if(determinedSchema == null || 
+                            (fss.get(i).type != determinedSchema.getField(i).type)) {
+                            // Either no schema was determined by loader OR the type 
+                            // from the "determinedSchema" is different
+                            // from the type specified - so we need to cast
+                            sawOne = true;
+                        }
+                }
+                types.add(fss.get(i).type);
             }
 
             // If all we've found are byte arrays, we don't need a projection.
             return sawOne;
-        } catch (FrontendException fe) {
+        } catch (Exception e) {
             throw new OptimizerException("Caught exception while trying to " +
-                " check if type casts are needed", fe);
+                " check if type casts are needed", e);
         }
     }
     
@@ -117,6 +131,14 @@
             ArrayList<LogicalPlan> genPlans = new ArrayList<LogicalPlan>(s.size());
             ArrayList<Boolean> flattens = new ArrayList<Boolean>(s.size());
             Map<String, Byte> typeChanges = new HashMap<String, Byte>();
+            // if we are inserting casts in a load and if the loader
+            // implements determineSchema(), insert casts only where necessary
+            // Note that in this case, the data coming out of the loader is not
+            // a BYTEARRAY but is whatever determineSchema() says it is.
+            Schema determinedSchema = null;
+            if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
+                determinedSchema = ((LOLoad)lo).getDeterminedSchema();
+            }
             for (int i = 0; i < s.size(); i++) {
                 LogicalPlan p = new LogicalPlan();
                 genPlans.add(p);
@@ -128,27 +150,37 @@
                 p.add(proj);
                 Schema.FieldSchema fs = s.getField(i);
                 if (fs.type != DataType.BYTEARRAY) {
-                    LOCast cast = new LOCast(p, OperatorKey.genOpKey(scope),
-                        proj, fs.type);
-                    p.add(cast);
-                    p.connect(proj, cast);
-                    
-                    cast.setFieldSchema(fs.clone());
-                    LoadFunc loadFunc = null;
-                    if(lo instanceof LOLoad) {
-                        loadFunc = ((LOLoad)lo).getLoadFunc();
-                    } else if (lo instanceof LOStream) {
-                        StreamingCommand command = ((LOStream)lo).getStreamingCommand();
-                        HandleSpec streamOutputSpec = command.getOutputSpec(); 
-                        loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
-                    } else {
-                        throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
-                    }
-                    cast.setLoadFunc(loadFunc);
-                    typeChanges.put(fs.canonicalName, fs.type);
-                    // Reset the loads field schema to byte array so that it
-                    // will reflect reality.
-                    fs.type = DataType.BYTEARRAY;
+                    if(determinedSchema == null || (fs.type != determinedSchema.getField(i).type)) {
+                            // Either no schema was determined by loader OR the type 
+                            // from the "determinedSchema" is different
+                            // from the type specified - so we need to cast
+                            LOCast cast = new LOCast(p, OperatorKey.genOpKey(scope),
+                                proj, fs.type);
+                            p.add(cast);
+                            p.connect(proj, cast);
+                            
+                            cast.setFieldSchema(fs.clone());
+                            LoadFunc loadFunc = null;
+                            if(lo instanceof LOLoad) {
+                                loadFunc = ((LOLoad)lo).getLoadFunc();
+                            } else if (lo instanceof LOStream) {
+                                StreamingCommand command = ((LOStream)lo).getStreamingCommand();
+                                HandleSpec streamOutputSpec = command.getOutputSpec(); 
+                                loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(streamOutputSpec.getSpec());
+                            } else {
+                                throw new OptimizerException("TypeCastInserter invoked with an invalid operator class name:" + lo.getClass().getSimpleName());
+                            }
+                            cast.setLoadFunc(loadFunc);
+                            typeChanges.put(fs.canonicalName, fs.type);
+                            if(determinedSchema == null) {
+                                // Reset the loads field schema to byte array so that it
+                                // will reflect reality.
+                                fs.type = DataType.BYTEARRAY;
+                            } else {
+                                // Reset the type to what determinedSchema says it is
+                                fs.type = determinedSchema.getField(i).type;
+                            }
+                        }
                 }
             }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Oct 14 15:26:46 2008
@@ -1013,7 +1013,8 @@
 		}
 
         try {
-		    lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null, splittable);
+		    lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec),
+		              pigContext.getExecType(), pigContext.getFs(), splittable);
         } catch (IOException ioe) {
             // The autogenerated parser code only catches RuntimeException and
             // ParseException as special Exceptions. All others are caught as

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Oct 14 15:26:46 2008
@@ -428,22 +428,46 @@
         */
 
         public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs) throws SchemaMergeException {
-            return mergePrefixFieldSchema(otherFs, true);
+            return mergePrefixFieldSchema(otherFs, true, false);
         }
 
         /***
+         * Recursively prefix merge two schemas
+         * @param otherFs the other field schema to be merged with
+         * @param otherTakesAliasPrecedence true if aliases from the other
+         *                                  field schema take precedence
+         * @return the prefix merged field schema this can be null if one schema is null and
+         *         allowIncompatibleTypes is true
+         *
+         * @throws SchemaMergeException if they cannot be merged
+         */
+
+         public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
+                                             boolean otherTakesAliasPrecedence)
+                                                 throws SchemaMergeException {
+             return mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, false);
+         }
+         
+        /***
         * Recursively prefix merge two schemas
         * @param otherFs the other field schema to be merged with
         * @param otherTakesAliasPrecedence true if aliases from the other
         *                                  field schema take precedence
-        * @return the prefix merged field schema this can be null if one schema is null and
-        *         allowIncompatibleTypes is true
+        * @param allowMergeableTypes true if "mergeable" types should be allowed.
+        *   Two types are mergeable if any of the following conditions is true IN THE
+        *   BELOW ORDER of checks:
+        *   1) if either one has a type null or unknown and other has a type OTHER THAN
+        *   null or unknown, the result type will be the latter non null/unknown type
+        *   2) If either type is bytearray, then result type will be the other (possibly non BYTEARRAY) type
+        *   3) If current type can be casted to the other type, then the result type will be the
+        *   other type 
+        * @return the prefix merged field schema this can be null. 
         *
         * @throws SchemaMergeException if they cannot be merged
         */
 
         public Schema.FieldSchema mergePrefixFieldSchema(Schema.FieldSchema otherFs,
-                                            boolean otherTakesAliasPrecedence)
+                                            boolean otherTakesAliasPrecedence, boolean allowMergeableTypes)
                                                 throws SchemaMergeException {
             Schema.FieldSchema myFs = this;
             Schema.FieldSchema mergedFs = null;
@@ -453,19 +477,29 @@
                 return myFs;
             }
 
-            if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN)
-                && (otherFs.type == DataType.NULL || otherFs.type == DataType.UNKNOWN)) {
+            if(isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) {
                 throw new SchemaMergeException("Type mismatch. No useful type for merging. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
             } else if(myFs.type == otherFs.type) {
                 mergedType = myFs.type;
-            } else if((myFs.type == DataType.NULL || myFs.type == DataType.UNKNOWN) 
-                && (otherFs.type == DataType.NULL)) {
-                mergedType = DataType.BYTEARRAY;
-            } else if ((myFs.type != DataType.NULL && myFs.type != DataType.UNKNOWN)
-                && (otherFs.type == DataType.NULL)) {
+            } else if (!isNullOrUnknownType(myFs) && isNullOrUnknownType(otherFs)) {
                 mergedType = myFs.type;
             } else {
-                throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+                if (allowMergeableTypes) {
+                    if (isNullOrUnknownType(myFs) && !isNullOrUnknownType(otherFs)) {
+                        mergedType = otherFs.type;
+                    }  else if(otherFs.type == DataType.BYTEARRAY) {
+                        // just set mergeType to myFs's type (could even be BYTEARRAY)
+                        mergedType = myFs.type;
+                    } else {
+                        if(castable(otherFs, myFs)) {
+                            mergedType = otherFs.type;
+                        } else {
+                            throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+                        }
+                    }
+                } else {
+                    throw new SchemaMergeException("Type mismatch. Field Schema: " + myFs + ". Other Field Schema: " + otherFs);
+                }
             }
     
             String mergedAlias = mergeAlias(myFs.alias,
@@ -481,7 +515,7 @@
                 // merge inner schemas because both sides have schemas
                 if(null != myFs.schema) {
                     mergedSubSchema = myFs.schema.mergePrefixSchema(otherFs.schema,
-                                                     otherTakesAliasPrecedence);
+                                                     otherTakesAliasPrecedence, allowMergeableTypes);
                 } else {
                     mergedSubSchema = otherFs.schema;
                 }
@@ -494,6 +528,10 @@
             }
             return mergedFs;
         }
+        
+        private boolean isNullOrUnknownType(FieldSchema fs) {
+            return (fs.type == DataType.NULL || fs.type == DataType.UNKNOWN);
+        }
 
     }
 
@@ -1307,6 +1345,31 @@
     public Schema mergePrefixSchema(Schema other,
                                boolean otherTakesAliasPrecedence)
                                     throws SchemaMergeException {
+        return mergePrefixSchema(other, otherTakesAliasPrecedence, false);
+    }
+    
+    /***
+     * Recursively prefix merge two schemas
+     * @param other the other schema to be merged with
+     * @param otherTakesAliasPrecedence true if aliases from the other
+     *                                  schema take precedence
+     * @param allowMergeableTypes true if "mergeable" types should be allowed.
+     *   Two types are mergeable if any of the following conditions is true IN THE 
+     *   BELOW ORDER of checks:
+     *   1) if either one has a type null or unknown and other has a type OTHER THAN
+     *   null or unknown, the result type will be the latter non null/unknown type
+     *   2) If either type is bytearray, then result type will be the other (possibly  non BYTEARRAY) type
+     *   3) If current type can be casted to the other type, then the result type will be the
+     *   other type 
+     * @return the prefix merged schema this can be null if one schema is null and
+     *         allowIncompatibleTypes is true
+     *
+     * @throws SchemaMergeException if they cannot be merged
+     */
+
+    public Schema mergePrefixSchema(Schema other,
+                               boolean otherTakesAliasPrecedence, boolean allowMergeableTypes)
+                                    throws SchemaMergeException {
         Schema schema = this;
 
         if (other == null) {
@@ -1332,7 +1395,7 @@
             FieldSchema myFs = mylist.get(idx) ;
             FieldSchema otherFs = otherlist.get(idx) ;
 
-            FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence);
+            FieldSchema mergedFs = myFs.mergePrefixFieldSchema(otherFs, otherTakesAliasPrecedence, allowMergeableTypes);
             outputList.add(mergedFs) ;
         }
         // if the first schema has leftover, then append the rest

Modified: incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/RangeSlicer.java Tue Oct 14 15:26:46 2008
@@ -22,6 +22,7 @@
 import java.net.URL;
 import java.util.Map;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.Slice;
 import org.apache.pig.Slicer;
@@ -193,9 +194,11 @@
     }
 
     /* (non-Javadoc)
-     * @see org.apache.pig.LoadFunc#determineSchema(java.net.URL)
+     * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
      */
-    public Schema determineSchema(URL fileName) throws IOException {
+    public Schema determineSchema(String fileName, ExecType execType,
+            DataStorage storage) throws IOException {
+        // TODO Auto-generated method stub
         return null;
     }
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Tue Oct 14 15:26:46 2008
@@ -524,5 +524,125 @@
         assertEquals("value2", t.get(4).toString());
         
     }
+    
+    
+    @Test
+    public void testBinStorageDetermineSchema() throws IOException, ExecException {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"{(f1, f2),(f3, f4)}\t(1,2)\t[key1#value1,key2#value2]"});
+        
+        pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
+        pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
+        Iterator<Tuple> it = pigServer.openIterator("b");
+        Tuple t = it.next();
+        assertEquals(new Long(2), t.get(0));
+        assertEquals(1, t.get(1));
+        assertEquals(2, t.get(2));
+        assertEquals("value1", t.get(3).toString());
+        assertEquals("value2", t.get(4).toString());
+        
+        //test with BinStorage
+        pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (b:bag{t:tuple(x:chararray,y:chararray)}, t2:tuple(a:int,b:int), m:map[]);");
+        String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema";
+        pigServer.deleteFile(output);
+        pigServer.store("a", output, BinStorage.class.getName());
+        // test with different load specifications
+        String[] loads = {"p = load '" + output +"' using BinStorage() " +
+                "as (b:bag{t:tuple(x,y)}, t2:tuple(a,b), m:map[]);",
+                "p = load '" + output +"' using BinStorage() " +
+                "as (b, t2, m);",
+                "p = load '" + output +"' using BinStorage() ;"};
+        // the corresponding generate statements
+        String[] generates = {"q = foreach p generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2', b;",
+                "q = foreach p generate COUNT(b), t2.$0, t2.$1, m#'key1', m#'key2', b;",
+                "q = foreach p generate COUNT($0), $1.$0, $1.$1, $2#'key1', $2#'key2', $0;"};
+        
+        for (int i = 0; i < loads.length; i++) {
+            pigServer.registerQuery(loads[i]);
+            pigServer.registerQuery(generates[i]);
+            it = pigServer.openIterator("q");
+            t = it.next();
+            assertEquals(new Long(2), t.get(0));
+            assertEquals(Integer.class, t.get(1).getClass());
+            assertEquals(1, t.get(1));
+            assertEquals(Integer.class, t.get(2).getClass());
+            assertEquals(2, t.get(2));
+            assertEquals("value1", t.get(3).toString());
+            assertEquals("value2", t.get(4).toString());
+            assertEquals(DefaultDataBag.class, t.get(5).getClass());
+            DataBag bg = (DataBag)t.get(5);
+            for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
+                Tuple bt = bit.next();
+                assertEquals(String.class, bt.get(0).getClass());
+                assertEquals(String.class, bt.get(1).getClass());            
+            }
+        }        
+    }
+
+    @Test
+    public void testBinStorageDetermineSchema2() throws IOException, ExecException {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"pigtester\t10\t1.2"});
+        
+        pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' using PigStorage() " +
+                "as (name:chararray, age:int, gpa:double);");
+        String output = "/pig/out/TestEvalPipeline-testBinStorageDetermineSchema2";
+        pigServer.deleteFile(output);
+        pigServer.store("a", output, BinStorage.class.getName());
+        // test with different load specifications
+        String[] loads = {"p = load '" + output +"' using BinStorage() " +
+                "as (name:chararray, age:int, gpa:double);",
+                "p = load '" + output +"' using BinStorage() " +
+                "as (name, age, gpa);",
+                "p = load '" + output +"' using BinStorage() ;"};
+        // the corresponding generate statements
+        String[] generates = {"q = foreach p generate name, age, gpa;",
+                "q = foreach p generate name, age, gpa;",
+                "q = foreach p generate $0, $1, $2;"};
+        
+        for (int i = 0; i < loads.length; i++) {
+            pigServer.registerQuery(loads[i]);
+            pigServer.registerQuery(generates[i]);
+            Iterator<Tuple> it = pigServer.openIterator("q");
+            Tuple t = it.next();
+            assertEquals("pigtester", t.get(0));
+            assertEquals(String.class, t.get(0).getClass());
+            assertEquals(10, t.get(1));
+            assertEquals(Integer.class, t.get(1).getClass());
+            assertEquals(1.2, t.get(2));
+            assertEquals(Double.class, t.get(2).getClass());
+        }
+        
+        // test that valid casting is allowed
+        pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
+                " as (name, age:long, gpa:float);");
+        pigServer.registerQuery("q = foreach p generate name, age, gpa;");
+        Iterator<Tuple> it = pigServer.openIterator("q");
+        Tuple t = it.next();
+        assertEquals("pigtester", t.get(0));
+        assertEquals(String.class, t.get(0).getClass());
+        assertEquals(10L, t.get(1));
+        assertEquals(Long.class, t.get(1).getClass());
+        assertEquals(1.2f, t.get(2));
+        assertEquals(Float.class, t.get(2).getClass());
+        
+        // test that implicit casts work
+        pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
+        " as (name, age, gpa);");
+        pigServer.registerQuery("q = foreach p generate name, age + 1L, (int)gpa;");
+        it = pigServer.openIterator("q");
+        t = it.next();
+        assertEquals("pigtester", t.get(0));
+        assertEquals(String.class, t.get(0).getClass());
+        assertEquals(11L, t.get(1));
+        assertEquals(Long.class, t.get(1).getClass());
+        assertEquals(1, t.get(2));
+        assertEquals(Integer.class, t.get(2).getClass());
+    }
 
+    
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java Tue Oct 14 15:26:46 2008
@@ -115,7 +115,7 @@
             new FileSpec(inputFile, new FuncSpec("org.apache.pig.builtin.PigStorage")) ;
         FileSpec filespec2 =
             new FileSpec(outputFile, new FuncSpec("org.apache.pig.builtin.PigStorage"));
-        LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1, null, true) ;       
+        LOLoad load = new LOLoad(plan, genNewOperatorKeyId(), filespec1, null, null, true) ;       
         LOStore store = new LOStore(plan, genNewOperatorKeyId(), filespec2) ;
         
         plan.add(load) ;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Oct 14 15:26:46 2008
@@ -38,6 +38,7 @@
 import org.apache.pig.LoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -47,14 +48,7 @@
 import org.apache.pig.impl.builtin.GFAny;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LODefine;
 import org.apache.pig.impl.logicalLayer.*;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.impl.logicalLayer.LOPrinter;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
@@ -395,10 +389,6 @@
             return null;
         }
         
-        public Schema determineSchema(URL filename) {
-            return null;
-        }
-        
         public void fieldsToRead(Schema schema) {
             
         }
@@ -470,6 +460,15 @@
 	    public byte[] toBytes(Tuple t) throws IOException {
             return null;
 	    }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+         */
+        public Schema determineSchema(String fileName, ExecType execType,
+                DataStorage storage) throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
     
     

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java Tue Oct 14 15:26:46 2008
@@ -55,6 +55,7 @@
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.junit.Before;
 import org.apache.pig.test.utils.TestHelper;
@@ -245,10 +246,6 @@
 
         public void fieldsToRead(Schema schema) {}
 
-        public Schema determineSchema(URL fileName) throws IOException {
-            return null;
-        }
-
 	    public byte[] toBytes(DataBag bag) throws IOException {
 	        return null;
 	    }
@@ -281,6 +278,15 @@
 	        return null;
 	    }
 
+        /* (non-Javadoc)
+         * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+         */
+        public Schema determineSchema(String fileName, ExecType execType,
+                DataStorage storage) throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
     }
 
 

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCast.java Tue Oct 14 15:26:46 2008
@@ -24,7 +24,9 @@
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
@@ -627,10 +629,6 @@
             return null;
         }
         
-        public Schema determineSchema(URL filename) {
-            return null;
-        }
-        
         public void fieldsToRead(Schema schema) {
             
         }
@@ -707,6 +705,15 @@
 	    public byte[] toBytes(Tuple t) throws IOException {
 	        return null;
 	    }
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
+         */
+        public Schema determineSchema(String fileName, ExecType execType,
+                DataStorage storage) throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 	
 	@Test

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Oct 14 15:26:46 2008
@@ -18,6 +18,8 @@
 
 package org.apache.pig.test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -49,8 +51,19 @@
                 simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
             else
                 simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+            File fileA = new File("a");
+            File fileB = new File("b");
+            // Create file if it does not exist
+            try {
+                if(!fileA.createNewFile() || !fileB.createNewFile())
+                    fail("Unable to create input files");
+            } catch (IOException e) {
+                fail("Unable to create input files:" + e.getMessage());
+            }
+            fileA.deleteOnExit();
+            fileB.deleteOnExit();
         }
-
+        
     @Test
     public void testExpressionTypeChecking1() throws Throwable {
         LogicalPlan plan = new LogicalPlan() ;
@@ -907,11 +920,11 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
         LOLoad load2 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1031,11 +1044,11 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
         LOLoad load2 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1203,7 +1216,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1271,7 +1284,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1341,7 +1354,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1411,7 +1424,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1491,7 +1504,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1589,7 +1602,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1684,7 +1697,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1812,7 +1825,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -1916,11 +1929,11 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
         LOLoad load2 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -2087,11 +2100,11 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
         LOLoad load2 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;
@@ -2226,11 +2239,11 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
         LOLoad load2 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // schema for input#1
         Schema inputSchema1 = null ;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java Tue Oct 14 15:26:46 2008
@@ -445,7 +445,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // set schemas
         load1.setEnforcedSchema(null) ;
@@ -511,7 +511,7 @@
         LOLoad load1 = new LOLoad(plan,
                                   genNewOperatorKey(),
                                   new FileSpec("pi", new FuncSpec(pigStorage)),
-                                  null, true) ;
+                                  null, null, true) ;
 
         // set schemas
         load1.setEnforcedSchema(null) ;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java Tue Oct 14 15:26:46 2008
@@ -18,6 +18,7 @@
 
 package org.apache.pig.test.utils;
 
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
@@ -193,6 +194,11 @@
 
         LogicalPlanBuilder.classloader = LogicalPlanTester.class.getClassLoader() ;
         PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+        try {
+            pigContext.connect();
+        } catch (ExecException e1) {
+            fail(e1.getClass().getName() + ": " + e1.getMessage() + " -- " + query);
+        }
         LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext);
 
         try {
@@ -222,6 +228,7 @@
             fail("IOException: " + e.getMessage());
         }
         catch (Exception e) {
+            e.printStackTrace();
             fail(e.getClass().getName() + ": " + e.getMessage() + " -- " + query);
         }
         return null;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java Tue Oct 14 15:26:46 2008
@@ -42,7 +42,7 @@
             LOLoad load = new LOLoad(plan,
                                       genNewOperatorKey(),
                                       new FileSpec("pi", new FuncSpec(pigStorage)),
-                                      null, true) ;
+                                      null, null, true) ;
             return load ;
         } catch (IOException e) {
             throw new AssertionError("This cannot happen") ;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java?rev=704724&r1=704723&r2=704724&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java Tue Oct 14 15:26:46 2008
@@ -94,7 +94,7 @@
         FileSpec fileSpec = new FileSpec("pi",
                                          new FuncSpec(PigStorage.class.getName())) ;
         try {
-            load = new LOLoad(plan, getKey(node.attributes), fileSpec, null, true) ;
+            load = new LOLoad(plan, getKey(node.attributes), fileSpec, null, null, true) ;
             fillSchema(load, node.attributes) ;
         }
         catch (IOException ioe) {