You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/04/02 22:11:10 UTC

svn commit: r644033 - in /incubator/pig/trunk: ./ lib-src/bzip2/org/apache/tools/bzip2r/ src/org/apache/pig/ src/org/apache/pig/backend/datastorage/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apac...

Author: olga
Date: Wed Apr  2 13:11:06 2008
New Revision: 644033

URL: http://svn.apache.org/viewvc?rev=644033&view=rev
Log:
PIG-55: addition of custom splitter framework

Added:
    incubator/pig/trunk/src/org/apache/pig/Slice.java
    incubator/pig/trunk/src/org/apache/pig/Slicer.java
    incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
    incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java
    incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
    incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
    incubator/pig/trunk/test/org/apache/pig/test/TestParser.java
Removed:
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigSplit.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
    incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Wed Apr  2 13:11:06 2008
@@ -197,3 +197,5 @@
 	the top level directory (joa23 via gates).
 
     PIG-94: M3 code update for streaming
+    
+    PIG-55: added custom splitter

Modified: incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ incubator/pig/trunk/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Wed Apr  2 13:11:06 2008
@@ -59,15 +59,10 @@
  */
 package org.apache.tools.bzip2r;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.InputStream;
 import java.io.IOException;
-import java.util.Random;
+import java.io.InputStream;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
 
 /**
  * An input stream that decompresses from the BZip2 format (without the file
@@ -142,7 +137,7 @@
     private int[][] perm = new int[N_GROUPS][MAX_ALPHA_SIZE];
     private int[] minLens = new int[N_GROUPS];
 
-    private FSDataInputStream innerBsStream;
+    private SeekableInputStream innerBsStream;
     long readLimit = Long.MAX_VALUE;
     public long getReadLimit() {
     	return readLimit;
@@ -182,8 +177,8 @@
     
     private long retPos, oldPos;
 
-    public CBZip2InputStream(FSDataInputStream zStream, int blockSize) throws IOException {
-        retPos = oldPos = zStream.getPos();
+    public CBZip2InputStream(SeekableInputStream zStream, int blockSize) throws IOException {
+        retPos = oldPos = zStream.tell();
     	ll8 = null;
         tt = null;
         checkComputedCombinedCRC = blockSize == -1;
@@ -193,7 +188,7 @@
         setupBlock();
     }
     
-    public CBZip2InputStream(FSDataInputStream zStream) throws IOException {
+    public CBZip2InputStream(SeekableInputStream zStream) throws IOException {
     	this(zStream, -1);
     }
 
@@ -231,7 +226,7 @@
     public long getPos() throws IOException{
     	if (innerBsStream == null)
     		return retPos;
-    	long newPos = innerBsStream.getPos();
+    	long newPos = innerBsStream.tell();
 	
 		if (newPos != oldPos){
 			retPos = oldPos;
@@ -364,7 +359,7 @@
         }
     }
 
-    private void bsSetStream(FSDataInputStream f) {
+    private void bsSetStream(SeekableInputStream f) {
         innerBsStream = f;
         bsLive = 0;
         bsBuff = 0;

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Wed Apr  2 13:11:06 2008
@@ -28,13 +28,11 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
@@ -314,7 +312,7 @@
         // already submitted to the back-end for compilation and
         // execution.
         
-        LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
+        LogicalPlan readFrom = aliases.get(id);
         
         // Run
         try {
@@ -351,7 +349,7 @@
         if (!aliases.containsKey(id))
             throw new IOException("Invalid alias: " + id);
         
-        if (FileLocalizer.fileExists(filename, pigContext)) {
+        if (FileLocalizer.fileExists(filename, pigContext.getDfs())) {
             StringBuilder sb = new StringBuilder();
             sb.append("Output file ");
             sb.append(filename);
@@ -481,19 +479,17 @@
     /**
      * Returns the length of a file in bytes which exists in the HDFS (accounts for replication).
      * @param filename
-     * @return
      * @throws IOException
      */
     public long fileSize(String filename) throws IOException {
         DataStorage dfs = pigContext.getDfs();
         ElementDescriptor elem = dfs.asElement(filename);
-        Map<String, Object> elemProps = elem.getStatistics();
-        String length = (String) elemProps.get(ElementDescriptor.LENGTH_KEY);
-        
-        Properties dfsProps = dfs.getConfiguration();
-        String replication = dfsProps.getProperty(DataStorage.DEFAULT_REPLICATION_FACTOR_KEY);
-            
-        return (new Long(length)).longValue() * (new Integer(replication)).intValue();
+        Map<String, Object> stats = elem.getStatistics();
+        long length = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
+        int replication = (Short) stats
+                .get(ElementDescriptor.BLOCK_REPLICATION_KEY);
+
+        return length * replication;
     }
     
     public boolean existsFile(String filename) throws IOException {

Added: incubator/pig/trunk/src/org/apache/pig/Slice.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Slice.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/Slice.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/Slice.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,83 @@
+package org.apache.pig;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.Tuple;
+
+/**
+ * A grouping of data on that can be processed individually by Pig. Instances of
+ * this interface are created by {@link Slicer}, serialized, and sent to nodes
+ * to be processed.
+ * <p>
+ * {@link #getLocations} is called as part of the configuration process to
+ * determine where this Slice should be run for maximal locality with the data
+ * to be read. Once the Slice arrives on the processing node,
+ * {@link #init(DataStorage)} is called to give it access to the
+ * <code>DataStorage</code> it should use to load Tuples. After
+ * <code>init</code> has been called, any of the other methods on this
+ * interface may be called as part of Pig's processing.
+ */
+public interface Slice extends Serializable {
+
+    /**
+     * Returns string representations of all the files that will be used as part
+     * of processing this Slice.
+     * <p>
+     * 
+     * This is the only method on Slice that is valid to call before
+     * {@link #init(DataStorage)} has been called.
+     */
+    String[] getLocations();
+
+    /**
+     * Initializes this Slice with the DataStorage it's to use to do its work.
+     * <p>
+     * This will always be called before <code>getLength</code>,
+     * <code>close</code>, <code>getPos</code>, <code>getProgress</code>
+     * and <code>next</code>.
+     */
+    void init(DataStorage store) throws IOException;
+
+    /**
+     * Returns the length in bytes of all of the data that will be processed by
+     * this Slice.
+     * <p>
+     * Only valid to call after {@link #init(DataStorage)} has been called.
+     */
+    long getLength();
+
+    /**
+     * Closes any streams this Slice has opened as part of its work.
+     * <p>
+     * Only valid to call after {@link #init(DataStorage)} has been called.
+     */
+    void close() throws IOException;
+
+    /**
+     * Returns the number of bytes read so far as part of processing this Slice.
+     * <p>
+     * Only valid to call after {@link #init(DataStorage)} has been called.
+     */
+    long getPos() throws IOException;
+
+    /**
+     * Returns the percentage of Slice that is complete from 0.0 to 1.0.
+     * <p>
+     * Only valid to call after {@link #init(DataStorage)} has been called.
+     */
+    float getProgress() throws IOException;
+
+    /**
+     * Loads the next value from this Slice into <code>value</code>.
+     * <p>
+     * Only valid to call after {@link #init(DataStorage)} has been called.
+     * 
+     * @param value -
+     *                the Tuple to be filled with the next value.
+     * @return - true if there are more Tuples to be read.
+     */
+    boolean next(Tuple value) throws IOException;
+
+}

Added: incubator/pig/trunk/src/org/apache/pig/Slicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Slicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/Slicer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/Slicer.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,32 @@
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.DataStorage;
+
+/**
+ * Produces independent slices of data from a given location to be processed in
+ * parallel by Pig.
+ * <p>
+ * If a class implementing this interface is given as the LoadFunc in a Pig
+ * script, it will be used to make slices for that load statement.
+ */
+public interface Slicer {
+    /**
+     * Checks that <code>location</code> is parsable by this Slicer, and that
+     * if the DataStorage is used by the Slicer, it's readable from there. If it
+     * isn't, an IOException with a message explaining why will be thrown.
+     * <p>
+     * This does not ensure that all the data in <code>location</code> is
+     * valid. It's a preflight check that there's some chance of the Slicer
+     * working before actual Slices are created and sent off for processing.
+     */
+    void validate(DataStorage store, String location) throws IOException;
+
+    /**
+     * Creates slices of data from <code>store</code> at <code>location</code>.
+     * 
+     * @return the Slices to be serialized and sent out to nodes for processing.
+     */
+    Slice[] slice(DataStorage store, String location) throws IOException;
+}

Modified: incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java Wed Apr  2 13:11:06 2008
@@ -16,9 +16,16 @@
 public interface ElementDescriptor extends 
             Comparable<ElementDescriptor> {
     
+        /** Available from getConfiguration as a String and getStatistics as a Long. */
         public static final String BLOCK_SIZE_KEY = "pig.path.block.size";
+
+        /** Available from getConfiguration as a String and getStatistics as a Short. */
         public static final String BLOCK_REPLICATION_KEY = "pig.path.block.replication";
+        
+        /** Available from getStatistics as a Long. */
         public static final String LENGTH_KEY = "pig.path.length";
+        
+        /** Available from getStatistics as a Long. */
         public static final String MODIFICATION_TIME_KEY = "pig.path.modification.time";
         
         //

Added: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlice.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.executionengine;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.pig.Slice;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.SeekableInputStream;
+import org.apache.pig.backend.datastorage.SeekableInputStream.FLAGS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.tools.bzip2r.CBZip2InputStream;
+
+/**
+ * Slice that loads data using a LoadFunc.
+ */
+public class PigSlice implements Slice {
+
+    public PigSlice(String path, String parser, long start, long length) {
+        this.file = path;
+        this.start = start;
+        this.length = length;
+        this.parser = parser;
+    }
+
+    public String[] getLocations() {
+        return new String[] { file };
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    public void init(DataStorage base) throws IOException {
+        if (parser == null) {
+            loader = new PigStorage();
+        } else {
+            try {
+                loader = (LoadFunc) PigContext.instantiateFuncFromSpec(parser);
+            } catch (Exception exp) {
+                throw new RuntimeException("can't instantiate " + parser);
+            }
+        }
+        fsis = base.asElement(base.getActiveContainer(), file).sopen();
+        fsis.seek(start, FLAGS.SEEK_CUR);
+
+        end = start + getLength();
+
+        if (file.endsWith(".bz") || file.endsWith(".bz2")) {
+            is = new CBZip2InputStream(fsis, 9);
+        } else if (file.endsWith(".gz")) {
+            is = new GZIPInputStream(fsis);
+            // We can't tell how much of the underlying stream GZIPInputStream
+            // has actually consumed
+            end = Long.MAX_VALUE;
+        } else {
+            is = fsis;
+        }
+        loader.bindTo(file.toString(), new BufferedPositionedInputStream(is,
+                start), start, end);
+    }
+
+    public boolean next(Tuple value) throws IOException {
+        Tuple t = loader.getNext();
+        if (t == null) {
+            return false;
+        }
+        value.copyFrom(t);
+        return true;
+    }
+
+    public long getPos() throws IOException {
+        return fsis.tell();
+    }
+
+    public void close() throws IOException {
+        is.close();
+    }
+
+    public float getProgress() throws IOException {
+        float progress = getPos() - start;
+        float finish = getLength();
+        return progress / finish;
+    }
+
+    // assigned during construction
+    String file;
+    long start;
+    long length;
+    String parser;
+
+    // Created as part of init
+    private InputStream is;
+    private SeekableInputStream fsis;
+    private long end;
+    private LoadFunc loader;
+
+    private static final long serialVersionUID = 1L;
+}

Added: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.executionengine;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.impl.io.FileLocalizer;
+
+/**
+ * Creates a slice per block size element in all files at location. If location
+ * is a glob or a directory, slices are created for every matched file.
+ * <p>
+ * 
+ * If individual files at location end with <code>.gz</code> or
+ * <code>.bz2</code>, they will be decompressed before being passed on to the
+ * LoadFunc.
+ */
+public class PigSlicer implements Slicer {
+    /**
+     * @param funcSpec -
+     *                the funcSpec for a LoadFunc that can process the data at
+     *                location.
+     */
+    public PigSlicer(String funcSpec) {
+        this.funcSpec = funcSpec;
+    }
+
+    public void setSplittable(boolean splittable) {
+        this.splittable = splittable;
+    }
+
+    public Slice[] slice(DataStorage store, String location) throws IOException {
+        validate(store, location);
+        List<Slice> slices = new ArrayList<Slice>();
+        List<ElementDescriptor> paths = new ArrayList<ElementDescriptor>();
+
+        // If you give a non-glob name, asCollection returns a single
+        // element with just that name.
+        ElementDescriptor[] globPaths = store.asCollection(location);
+        for (int m = 0; m < globPaths.length; m++) {
+            paths.add(globPaths[m]);
+        }
+        for (int j = 0; j < paths.size(); j++) {
+            ElementDescriptor fullPath = store.asElement(store
+                    .getActiveContainer(), paths.get(j));
+            if (fullPath instanceof ContainerDescriptor) {
+                for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
+                    paths.add(child);
+                }
+                continue;
+            }
+            Map<String, Object> stats = fullPath.getStatistics();
+            long bs = (Long) (stats.get(ElementDescriptor.BLOCK_SIZE_KEY));
+            long size = (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
+            long pos = 0;
+            String name = fullPath.toString();
+            System.out.println(size + " " + name);
+            if (name.endsWith(".gz") || !splittable) {
+                // Anything that ends with a ".gz" we must process as a complete
+                // file
+                slices.add(new PigSlice(name, funcSpec, 0, size));
+            } else {
+                while (pos < size) {
+                    if (pos + bs > size) {
+                        bs = size - pos;
+                    }
+                    slices.add(new PigSlice(name, funcSpec, pos, bs));
+                    pos += bs;
+                }
+            }
+        }
+        return slices.toArray(new Slice[slices.size()]);
+    }
+
+    public void validate(DataStorage store, String location) throws IOException {
+        if (!FileLocalizer.fileExists(location, store)) {
+            throw new IOException(store.asElement(location) + " does not exist");
+        }
+    }
+
+    private String funcSpec;
+    
+    private boolean splittable;
+}

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Wed Apr  2 13:11:06 2008
@@ -35,20 +35,20 @@
     public HDataStorage(HConfiguration conf) throws IOException {
         fs = FileSystem.get(conf.getConfiguration());
     }
-
+    
     public void init() { }
     
     public void close() throws IOException {
         fs.close();
     }
-    
+
     public Properties getConfiguration() {
         Properties props = new HConfiguration(fs.getConf());
-                
+
         short defaultReplication = fs.getDefaultReplication();
-        props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
-                          (new Short(defaultReplication)).toString());
-        
+        props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY, ""
+                + defaultReplication);
+
         return props;
     }
     

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Wed Apr  2 13:11:06 2008
@@ -8,6 +8,7 @@
 import java.util.Map;
 import java.util.HashMap;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -123,13 +124,12 @@
     public Map<String, Object> getStatistics() throws IOException {
         HashMap<String, Object> props = new HashMap<String, Object>();
         
-        Long length = new Long(fs.getHFS().getFileStatus(path).getLen());
+        FileStatus fileStatus = fs.getHFS().getFileStatus(path);
 
-        Long modificationTime = new Long(fs.getHFS().getFileStatus(path).
-                                         getModificationTime());
-
-        props.put(LENGTH_KEY, length.toString());
-        props.put(MODIFICATION_TIME_KEY, modificationTime.toString());
+        props.put(BLOCK_SIZE_KEY, fileStatus.getBlockSize());
+        props.put(BLOCK_REPLICATION_KEY, fileStatus.getReplication());
+        props.put(LENGTH_KEY, fileStatus.getLen());
+        props.put(MODIFICATION_TIME_KEY, fileStatus.getModificationTime());
         
         return props;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Wed Apr  2 13:11:06 2008
@@ -19,13 +19,10 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Map;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.builtin.BinStorage;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.FunctionInstantiator;
 import org.apache.pig.impl.builtin.FindQuantiles;
@@ -38,7 +35,6 @@
 import org.apache.pig.impl.eval.GenerateSpec;
 import org.apache.pig.impl.eval.ProjectSpec;
 import org.apache.pig.impl.eval.CompositeEvalSpec;
-import org.apache.pig.impl.eval.MapLookupSpec;
 import org.apache.pig.impl.eval.SortDistinctSpec;
 import org.apache.pig.impl.eval.StarSpec;
 import org.apache.pig.impl.eval.EvalSpecVisitor;
@@ -53,16 +49,11 @@
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LOUnion;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.physicalLayer.PlanCompiler;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SortPartitioner;
-import org.apache.pig.backend.hadoop.datastorage.HFile;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
 
 // compiler for mapreduce physical plans
 public class MapreducePlanCompiler {
@@ -98,9 +89,7 @@
                                               logicalKey,
                                               pigContext);
 
-            String filename = FileLocalizer.fullPath(materializedResult.outFileSpec.getFileName(), pigContext);
-            FileSpec fileSpec = new FileSpec(filename, materializedResult.outFileSpec.getFuncSpec());
-            pom.addInputFile(fileSpec);
+            pom.addInputFile(materializedResult.outFileSpec);
             pom.mapParallelism = Math.max(pom.mapParallelism, materializedResult.parallelismRequest);
 
             return pom.getOperatorKey();            
@@ -188,13 +177,10 @@
                                               logicalKey,
                                               pigContext,
                                               compiledInputs);
-            LOLoad loLoad = (LOLoad) lo;
-            String filename = FileLocalizer.fullPath(loLoad.getInputFileSpec().getFileName(), pigContext);
-            FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
-            pom.addInputFile(fileSpec);
+            pom.addInputFile(((LOLoad) lo).getInputFileSpec());
             pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
-            pom.setProperty("pig.input.splitable", 
-                            Boolean.toString(loLoad.isSplitable()));
+            pom.setProperty("pig.input.splittable", 
+                            Boolean.toString(((LOLoad)lo).isSplittable()));
             return pom.getOperatorKey();
         } 
         else if (lo instanceof LOStore) {
@@ -407,7 +393,7 @@
         String comparatorFuncName = loSort.getSortSpec().getComparatorName();
         if (comparatorFuncName != null) {
             sortJob.userComparator =
-                (Class<WritableComparator>)PigContext.resolveClassName(
+                PigContext.resolveClassName(
                     comparatorFuncName);
         }
 

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Wed Apr  2 13:11:06 2008
@@ -68,8 +68,7 @@
                 }
             }
 
-            PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
-            index = split.getIndex();
+            index = PigInputFormat.getActiveSplit().getIndex();
 
             Datum groupName = ((Tuple) key).getField(0);
             finalout.group = ((Tuple) key);

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Wed Apr  2 13:11:06 2008
@@ -18,52 +18,53 @@
 package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.pig.LoadFunc;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.PigSlicer;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.ValidatingInputFileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.tools.bzip2r.CBZip2InputStream;
 
+public class PigInputFormat implements InputFormat<Text, Tuple>,
+        JobConfigurable {
 
-public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
-
-    public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-        boolean isSplitable = job.getBoolean("pig.input.splitable", false);
-        
-        ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer.deserialize(job.get("pig.inputs"));        
-        ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.mapFuncs",""));        
-        ArrayList<EvalSpec> groupFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.groupFuncs", ""));        
-        PigContext pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
-        //TODO: don't understand this code
+    @SuppressWarnings("unchecked")
+    public InputSplit[] getSplits(JobConf job, int numSplits)
+            throws IOException {
+        boolean isSplittable = job.getBoolean("pig.input.splittable", false);
+        ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer
+                .deserialize(job.get("pig.inputs"));
+        ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer
+                .deserialize(job.get("pig.mapFuncs", ""));
+        ArrayList<EvalSpec> groupFuncs = (ArrayList<EvalSpec>) ObjectSerializer
+                .deserialize(job.get("pig.groupFuncs", ""));
+
+        PigContext pigContext = (PigContext) ObjectSerializer.deserialize(job
+                .get("pig.pigContext"));
+        // TODO: don't understand this code
         // added for UNION: set group func arity to match arity of inputs
-        if (groupFuncs!=null && groupFuncs.size() != inputs.size()) {
+        if (groupFuncs != null && groupFuncs.size() != inputs.size()) {
             groupFuncs = new ArrayList<EvalSpec>();
             for (int i = 0; i < groupFuncs.size(); i++) {
-                groupFuncs.set(i,null);
+                groupFuncs.set(i, null);
             }
         }
-        
+
         if (inputs.size() != mapFuncs.size()) {
             StringBuilder sb = new StringBuilder();
             sb.append("number of inputs != number of map functions: ");
@@ -74,7 +75,6 @@
             sb.append(job.get("pig.mapFuncs", "missing"));
             throw new IOException(sb.toString());
         }
-        
         if (groupFuncs!= null && inputs.size() != groupFuncs.size()) {
             StringBuilder sb = new StringBuilder();
             sb.append("number of inputs != number of group functions: ");
@@ -83,162 +83,52 @@
             sb.append(groupFuncs.size());
             throw new IOException(sb.toString());
         }
-        
-        ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+
+        FileSystem fs = FileSystem.get(job);
+        List<SliceWrapper> splits = new ArrayList<SliceWrapper>();
         for (int i = 0; i < inputs.size(); i++) {
-            Path path = new Path(inputs.get(i).getFileName());
-            String parser = inputs.get(i).getFuncSpec();
-            FileSystem fs = path.getFileSystem(job);
-
-            fs.setWorkingDirectory(new Path("/user", job.getUser()));
-            ArrayList<Path> paths = new ArrayList<Path>();
-            // If you give a non-glob name, globPaths returns a single
-            // element with just that name.
-            Path[] globPaths = fs.globPaths(path); 
-            for (int m = 0; m < globPaths.length; m++) paths.add(globPaths[m]);
-            //paths.add(path);
-            for (int j = 0; j < paths.size(); j++) {
-                Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j));
-                if (fs.getFileStatus(fullPath).isDir()) {
-                    FileStatus children[] = fs.listStatus(fullPath);
-                    for(int k = 0; k < children.length; k++) {
-                        paths.add(children[k].getPath());
-                    }
-                    continue;
-                }
-                long bs = fs.getFileStatus(fullPath).getBlockSize();
-                long size = fs.getFileStatus(fullPath).getLen();
-                long pos = 0;
-                String name = paths.get(j).getName();
-                if (name.endsWith(".gz") || !isSplitable) {
-                    // Anything that ends with a ".gz" or can't be split
-                    // we must process as a complete file
-                    splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, 0, size));
-                } else {
-                    while (pos < size) {
-                        if (pos + bs > size)
-                            bs = size - pos;
-                        splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, pos, bs));
-                        pos += bs;
-                    }
-                }
+            DataStorage store = new HDataStorage(job);
+            ValidatingInputFileSpec spec;
+            if (inputs.get(i) instanceof ValidatingInputFileSpec) {
+                spec = (ValidatingInputFileSpec) inputs.get(i);
+            } else {
+                spec = new ValidatingInputFileSpec(inputs.get(i), store);
+            }
+            EvalSpec groupBy = groupFuncs == null ? null : groupFuncs.get(i);
+            if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+                ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
+            }
+            Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
+            for (Slice split : pigs) {
+                splits.add(new SliceWrapper(split, pigContext, groupBy,
+                        mapFuncs.get(i), i, fs));
             }
         }
-        return splits.toArray(new PigSplit[splits.size()]);
+        return splits.toArray(new SliceWrapper[splits.size()]);
     }
 
-   public RecordReader<Text, Tuple> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
-        PigRecordReader r = new PigRecordReader(job, (PigSplit)split, compressionCodecs);
-        return r;
+    public RecordReader<Text, Tuple> getRecordReader(InputSplit split,
+            JobConf job, Reporter reporter) throws IOException {
+        activeSplit.set((SliceWrapper) split);
+        return ((SliceWrapper) split).makeReader(job);
     }
 
-    private CompressionCodecFactory compressionCodecs = null;
-
-    static public String codecList;
     public void configure(JobConf conf) {
-        compressionCodecs = new CompressionCodecFactory(conf);
-        codecList = conf.get("io.compression.codecs", "none");
     }
-    
-    public static class PigRecordReader implements RecordReader<Text, Tuple> {
-        /**
-         * This is a tremendously ugly hack to get around the fact that mappers do not have access
-         * to their readers. We take advantage of the fact that RecordReader.next and Mapper.map is
-         * run on same the thread to share information through a thread local variable.
-         */
-        static ThreadLocal<PigRecordReader> myReader = new ThreadLocal<PigRecordReader>();
-
-        public static PigRecordReader getPigRecordReader() {
-            return myReader.get();
-        }
-
-        private InputStream     is;
-        private FSDataInputStream   fsis;
-        private long            end;
-        private PigSplit    split;
-        LoadFunc                loader;
-        CompressionCodecFactory compressionFactory;
-        JobConf job;
-        
-        PigRecordReader(JobConf job, PigSplit split, CompressionCodecFactory compressionFactory) throws IOException {
-            this.split = split;
-            this.job = job;
-            this.compressionFactory = compressionFactory;
-            loader = split.getLoadFunction();
-            Path path = split.getPath();
-            FileSystem fs = path.getFileSystem(job);
-            fs.setWorkingDirectory(new Path("/user/" + job.getUser()));
-            CompressionCodec codec = compressionFactory.getCodec(split.getPath());
-            long start = split.getStart();
-            fsis = fs.open(split.getPath());
-            fsis.seek(start);
-
-            if (codec != null) {
-                is = codec.createInputStream(fsis);
-                end = Long.MAX_VALUE;
-                
-            } else{
-                end = start + split.getLength();    
-                
-                if (split.file.getName().endsWith(".bz") ||
-                        split.file.getName().endsWith(".bz2")) {
-                    is = new CBZip2InputStream(fsis,9);
-                }else{
-                    is = fsis;
-                }
-            }
-            myReader.set(this);
-            loader.bindTo(split.getPath().toString(), new BufferedPositionedInputStream(is, start), start, end);
-         
-            // Mimic FileSplit
-            job.set("map.input.file", split.getPath().toString());
-            job.setLong("map.input.start", split.getStart());
-            job.setLong("map.input.length", split.getLength());
-        }
-
-        public JobConf getJobConf(){
-            return job;
-        }
-
-        public boolean next(Text key, Tuple value) throws IOException {
-            Tuple t = loader.getNext();
-            if (t == null) {
-                return false;
-            }
 
-            key.set(split.getPath().getName());
-            value.copyFrom(t);
-            return true;
-        }
-
-        public long getPos() throws IOException {
-            return fsis.getPos();
-        }
-
-        public void close() throws IOException {
-            is.close();
-        }
-
-        public PigSplit getPigFileSplit() {
-            return split;
-        }
-
-        public Text createKey() {
-            return new Text();
-        }
-
-        public Tuple createValue() {
-            return new Tuple();
-        }
-
-    public float getProgress() throws IOException {
-        float progress = getPos() - split.getStart();
-        float finish = split.getLength();
-        return progress/finish;
-    }
+    public static SliceWrapper getActiveSplit() {
+        return activeSplit.get();
     }
 
+    /**
+     * This is a tremendously ugly hack to get around the fact that mappers do
+     * not have access to their readers. We take advantage of the fact that
+     * RecordReader.next and Mapper.map is run on same the thread to share
+     * information through a thread local variable.
+     */
+    private static ThreadLocal<SliceWrapper> activeSplit = new ThreadLocal<SliceWrapper>();
+
     public void validateInput(JobConf arg0) throws IOException {
     }
 
- }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Wed Apr  2 13:11:06 2008
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -52,7 +51,6 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.tools.timer.PerformanceTimerFactory;
 
 
 /**
@@ -220,7 +218,7 @@
     
     private void setupMapPipe(Properties properties, Reporter reporter) 
     throws IOException {
-        PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
+        SliceWrapper split = PigInputFormat.getActiveSplit();
         index = split.getIndex();
         EvalSpec evalSpec = split.getEvalSpec();
         

Added: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,174 @@
+package org.apache.pig.backend.hadoop.executionengine.mapreduceExec;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.pig.Slice;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+
+/**
+ * Wraps a {@link Slice} in an {@link InputSplit} so it's usable by hadoop.
+ */
+public class SliceWrapper implements InputSplit {
+
+    private EvalSpec groupbySpec;
+    private EvalSpec evalSpec;
+    private int index;
+    private PigContext pigContext;
+    private Slice wrapped;
+    private transient FileSystem fs;// transient so it isn't serialized
+    private transient JobConf lastConf;
+
+    public SliceWrapper() {
+        // for deserialization
+    }
+
+    public SliceWrapper(Slice slice, PigContext context, EvalSpec groupbySpec,
+            EvalSpec evalSpec, int index, FileSystem fs) {
+        this.wrapped = slice;
+        this.pigContext = context;
+        this.groupbySpec = groupbySpec;
+        this.evalSpec = evalSpec;
+        this.index = index;
+        this.fs = fs;
+    }
+
+    public EvalSpec getEvalSpec() {
+        return evalSpec;
+    }
+
+    public EvalSpec getGroupbySpec() {
+        return groupbySpec;
+    }
+
+    public int getIndex() {
+        return index;
+    }
+
+    public long getLength() throws IOException {
+        return wrapped.getLength();
+    }
+
+    public String[] getLocations() throws IOException {
+        Set<String> locations = new HashSet<String>();
+        for (String loc : wrapped.getLocations()) {
+            Path path = new Path(loc);
+            String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
+                    path).getLen());
+            for (int i = 0; i < hints.length; i++) {
+                for (int j = 0; j < hints[i].length; j++) {
+                    locations.add(hints[i][j]);
+                }
+            }
+        }
+        return locations.toArray(new String[locations.size()]);
+    }
+
+    public JobConf getJobConf() {
+        return lastConf;
+    }
+
+    public RecordReader<Text, Tuple> makeReader(JobConf job) throws IOException {
+        lastConf = job;
+        DataStorage store = new HDataStorage(job);
+        store.setActiveContainer(store.asContainer("/user/" + job.getUser()));
+        wrapped.init(store);
+        return new RecordReader<Text, Tuple>() {
+
+            public void close() throws IOException {
+                wrapped.close();
+            }
+
+            public Text createKey() {
+                return new Text();
+            }
+
+            public Tuple createValue() {
+                return new Tuple();
+            }
+
+            public long getPos() throws IOException {
+                return wrapped.getPos();
+            }
+
+            public float getProgress() throws IOException {
+                return wrapped.getProgress();
+            }
+
+            public boolean next(Text key, Tuple value) throws IOException {
+                return wrapped.next(value);
+            }
+        };
+    }
+
+    public void readFields(DataInput is) throws IOException {
+        pigContext = (PigContext) readObject(is);
+
+        groupbySpec = (EvalSpec) readObject(is);
+        if (groupbySpec != null) {
+            groupbySpec.instantiateFunc(pigContext);
+        }
+        evalSpec = (EvalSpec) readObject(is);
+        if (evalSpec != null) {
+            evalSpec.instantiateFunc(pigContext);
+        }
+        index = is.readInt();
+        wrapped = (Slice) readObject(is);
+    }
+
+    private IOException wrapException(Exception e) {
+        IOException newE = new IOException(e.getMessage());
+        newE.initCause(e);
+        return newE;
+    }
+
+    private Object readObject(DataInput is) throws IOException {
+        byte[] bytes = new byte[is.readInt()];
+        is.readFully(bytes);
+        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
+                bytes));
+        try {
+            return ois.readObject();
+        } catch (ClassNotFoundException cnfe) {
+            IOException newE = wrapException(cnfe);
+            throw newE;
+        }
+    }
+
+    public void write(DataOutput os) throws IOException {
+        writeObject(pigContext, os);
+        writeObject(groupbySpec, os);
+        writeObject(evalSpec, os);
+        os.writeInt(index);
+        writeObject(wrapped, os);
+    }
+
+    private void writeObject(Serializable obj, DataOutput os)
+            throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(obj);
+        byte[] bytes = baos.toByteArray();
+        os.writeInt(bytes.length);
+        os.write(bytes);
+    }
+
+}

Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalDataStorage.java Wed Apr  2 13:11:06 2008
@@ -26,7 +26,7 @@
     public Properties getConfiguration() {
         Properties config = new Properties();
         
-        config.put(DEFAULT_REPLICATION_FACTOR_KEY, (new Integer(1)).toString());
+        config.put(DEFAULT_REPLICATION_FACTOR_KEY, "" + 1);
         
         return config;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java Wed Apr  2 13:11:06 2008
@@ -123,10 +123,12 @@
         Map<String, Object> stats = new HashMap<String, Object>();
 
         long size = this.path.length();
-        stats.put(LENGTH_KEY , (new Long(size)).toString());
+        stats.put(LENGTH_KEY, size);
+
+        stats.put(BLOCK_REPLICATION_KEY, (short) 1);
 
         long lastModified = this.path.lastModified();
-        stats.put(MODIFICATION_TIME_KEY, (new Long(lastModified)).toString());
+        stats.put(MODIFICATION_TIME_KEY, lastModified);
         
         return stats;
     }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Apr  2 13:11:06 2008
@@ -17,7 +17,6 @@
  */
 package org.apache.pig.impl.io;
 
-import java.lang.IllegalArgumentException;
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -26,26 +25,24 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Random;
-import java.util.Stack;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Random;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.WrappedIOException;
-
-import org.apache.pig.backend.datastorage.*;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigInputFormat.PigRecordReader;
-
-import java.util.Properties;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.SliceWrapper;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.WrappedIOException;
 
 public class FileLocalizer {
     private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -141,19 +138,17 @@
     }
 
     /**
-     * This function is meant to be used if the mappers/reducers want to access any HDFS file
-     * @param fileName
-     * @return
-     * @throws IOException
+     * This function is meant to be used if the mappers/reducers want to access
+     * any HDFS file
      */
-    
-    public static InputStream openDFSFile(String fileName) throws IOException{
-        PigRecordReader prr = PigInputFormat.PigRecordReader.getPigRecordReader();
-        
-        if (prr == null)
-            throw new RuntimeException("can't open DFS file while executing locally");
-    
-        return openDFSFile(fileName, prr.getJobConf());
+    public static InputStream openDFSFile(String fileName) throws IOException {
+        SliceWrapper wrapper = PigInputFormat.getActiveSplit();
+
+        if (wrapper == null)
+            throw new RuntimeException(
+                    "can't open DFS file while executing locally");
+
+        return openDFSFile(fileName, wrapper.getJobConf());
         
     }
 
@@ -335,21 +330,16 @@
         }
     }
 
-    public static boolean fileExists(String filename, PigContext pigContext) throws IOException {
-        try
-        {
-            ElementDescriptor elem = pigContext.getDfs().asElement(filename);
+    public static boolean fileExists(String filename, PigContext context)
+            throws IOException {
+        return fileExists(filename, context.getDfs());
+    }
 
-            if (elem.exists()) {
-                return true;
-            }
-            else {
-                return globMatchesFiles(elem, pigContext.getDfs());
-            }
-        }
-        catch (DataStorageException e) {
-            return false;
-        }
+    public static boolean fileExists(String filename, DataStorage store)
+            throws IOException {
+        ElementDescriptor elem = store.asElement(filename);
+
+        return elem.exists() || globMatchesFiles(elem, store);
     }
 
     private static boolean globMatchesFiles(ElementDescriptor elem,

Added: incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/ValidatingInputFileSpec.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,59 @@
+package org.apache.pig.impl.io;
+
+import java.io.IOException;
+
+import org.apache.pig.Slicer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.PigSlicer;
+import org.apache.pig.impl.PigContext;
+
+/**
+ * Creates a Slicer using its funcSpec in its construction and checks that it's
+ * valid.
+ */
+public class ValidatingInputFileSpec extends FileSpec {
+
+    // Don't send the instantiated slicer over the wire.
+    private transient Slicer slicer;
+
+    private static final long serialVersionUID = 1L;
+
+    public ValidatingInputFileSpec(FileSpec fileSpec, DataStorage store)
+            throws IOException {
+        super(fileSpec.getFileName(), fileSpec.getFuncSpec());
+        validate(store);
+    }
+
+    /**
+     * If the <code>ExecType</code> of <code>context</code> is LOCAL,
+     * validation is not performed.
+     */
+    public ValidatingInputFileSpec(String fileName, String funcSpec,
+            PigContext context) throws IOException {
+
+        super(fileName, funcSpec);
+        if (context.getExecType() != ExecType.LOCAL) {
+            validate(context.getDfs());
+        }
+    }
+
+    private void validate(DataStorage store) throws IOException {
+        getSlicer().validate(store, getFileName());
+    }
+
+    /**
+     * Returns the Slicer created by this spec's funcSpec.
+     */
+    public Slicer getSlicer() {
+        if (slicer == null) {
+            Object loader = PigContext.instantiateFuncFromSpec(getFuncSpec());
+            if (loader instanceof Slicer) {
+                slicer = (Slicer) loader;
+            } else {
+                slicer = new PigSlicer(getFuncSpec());
+            }
+        }
+        return slicer;
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Wed Apr  2 13:11:06 2008
@@ -23,7 +23,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
@@ -40,20 +39,19 @@
 
     protected int outputType = FIXED;
     
-    protected boolean splitable = true;
+    protected boolean splittable = true;
 
     public LOLoad(Map<OperatorKey, LogicalOperator> opTable, 
                   String scope, 
                   long id, 
-                  FileSpec inputFileSpec, boolean splitable) 
+                  FileSpec inputFileSpec, boolean splittable) 
     throws IOException, ParseException {
         super(opTable, scope, id);
         this.inputFileSpec = inputFileSpec;
-        this.splitable = splitable;
+        this.splittable = splittable;
         
         // check if we can instantiate load func
-        LoadFunc storageFunc = (LoadFunc) PigContext
-                .instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
+        PigContext.instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
 
         // TODO: Handle Schemas defined by Load Functions
         schema = new TupleSchema();
@@ -116,8 +114,8 @@
         return funcs;
     }
 
-    public boolean isSplitable() {
-        return splitable;
+    public boolean isSplittable() {
+        return splittable;
     }
     
     public void visit(LOVisitor v) {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Wed Apr  2 13:11:06 2008
@@ -64,7 +64,7 @@
 
         if (parentLoad) {
             EvalSpec spec = e.getSpec();
-            if (spec instanceof StreamSpec && !load.isSplitable()) {
+            if (spec instanceof StreamSpec && !load.isSplittable()) {
                 // Try and optimize if the load and stream input specs match
                 // and input files are to be processed as-is
                 StreamSpec streamSpec = (StreamSpec)spec;

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Apr  2 13:11:06 2008
@@ -124,20 +124,12 @@
 	
 	
 	
-	String massageFilename(String filename, PigContext pigContext, boolean checkExists) 
+	String massageFilename(String filename, PigContext pigContext)
 	throws IOException, ParseException {
 		if (pigContext.getExecType() != ExecType.LOCAL) {
 			if (filename.startsWith(FileLocalizer.LOCAL_PREFIX)) {
 					filename = FileLocalizer.hadoopify(filename, pigContext);
 			} 
-			else
-			{
-				// make sure that dfs file exists
-				if (checkExists && !FileLocalizer.fileExists(filename, pigContext))
-				{
-					throw new ParseException(FileLocalizer.fullPath(filename, pigContext) + " does not exist");
-				}
-			}
 		}
 		return filename;
 	}
@@ -689,7 +681,13 @@
 			funcSpec += continuous ? "('\t','\n','0')" : "()";
 		}
 		 
-		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec), splitable);	
+        try {
+            lo = new LOLoad(opTable, scope, getNextId(), 
+                        new ValidatingInputFileSpec(massageFilename(filename, pigContext), 
+                                               funcSpec, pigContext), splitable);	
+        } catch(IOException e) {
+            throw new ParseException(e.getMessage());
+        }
 		if (continuous)
 			lo.setOutputType(LogicalOperator.MONOTONE);
 		return lo;
@@ -1507,7 +1505,7 @@
         }
          
         LogicalPlan readFrom = aliases.get(t.image);
-        String jobOutputFile = massageFilename(fileName, pigContext, false);
+        String jobOutputFile = massageFilename(fileName, pigContext);
         lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
                          new FileSpec(jobOutputFile, functionSpec),
                          false);
@@ -1582,4 +1580,4 @@
 	{
         command.setLogDir(alias);
 	}
-}
\ No newline at end of file
+}

Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=644033&r1=644032&r2=644033&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Wed Apr  2 13:11:06 2008
@@ -13,7 +13,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.fs.permission.AccessControlException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -306,43 +305,28 @@
                     
                     if (mDfs.isContainer(curElem.toString())) {
                            System.out.println(curElem.toString() + "\t<dir>");
-                    }
-                    else {
-                        Properties config = curElem.getConfiguration();
-                        Map<String, Object> stats = curElem.getStatistics();
-                        
-                        String strReplication = config.getProperty(ElementDescriptor.BLOCK_REPLICATION_KEY);
-                        String strLen = (String) stats.get(ElementDescriptor.LENGTH_KEY);
-
-                        StringBuilder sb = new StringBuilder();
-                        sb.append(curElem.toString());
-                        sb.append("<r ");
-                        sb.append(strReplication);
-                        sb.append(">\t");
-                        sb.append(strLen);
-                        System.out.println(sb.toString());
+                    } else {
+                        printLengthAndReplication(curElem);
                     }
                 }
-            }
-            else {
-                Properties config = pathDescriptor.getConfiguration();
-                Map<String, Object> stats = pathDescriptor.getStatistics();
-
-                String strReplication = (String) config.get(ElementDescriptor.BLOCK_REPLICATION_KEY);
-                String strLen = (String) stats.get(ElementDescriptor.LENGTH_KEY);
-
-                StringBuilder sb = new StringBuilder();
-                sb.append(pathDescriptor.toString());
-                sb.append("<r ");
-                sb.append(strReplication);
-                sb.append(">\t");
-                sb.append(strLen);
-                System.out.println(sb.toString());
+            } else {
+                printLengthAndReplication(pathDescriptor);
             }
         }
         catch (DataStorageException e) {
             throw WrappedIOException.wrap("Failed to LS on " + path, e);
         }
+    }
+
+    private void printLengthAndReplication(ElementDescriptor elem)
+            throws IOException {
+        Map<String, Object> stats = elem.getStatistics();
+
+        long replication = (Short) stats
+                .get(ElementDescriptor.BLOCK_REPLICATION_KEY);
+        long len = (Long) stats.get(ElementDescriptor.LENGTH_KEY);
+
+        System.out.println(elem.toString() + "<r " + replication + ">\t" + len);
     }
     
     protected void processPWD() throws IOException 

Added: incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/RangeSlicer.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,100 @@
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.Slice;
+import org.apache.pig.Slicer;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+
+/**
+ * Makes slices each containing a single value from 0 to value - 1.
+ */
+public class RangeSlicer
+    implements Slicer
+{
+
+    /**
+     * Expects location to be a Stringified integer, and makes
+     * Integer.parseInt(location) slices. Each slice generates a single value,
+     * its index in the sequence of slices.
+     */
+    public Slice[] slice (DataStorage store, String location)
+        throws IOException
+    {
+        int numslices = Integer.parseInt(location);
+        Slice[] slices = new Slice[numslices];
+        for (int i = 0; i < slices.length; i++) {
+            slices[i] = new SingleValueSlice(i);
+        }
+        return slices;
+    }
+
+    public void validate(DataStorage store, String location) throws IOException {
+        try {
+            Integer.parseInt(location);
+        } catch (NumberFormatException nfe) {
+            throw new IOException(nfe.getMessage());
+        }
+    }
+
+    /**
+     * A Slice that returns a single value from next.
+     */
+    public static class SingleValueSlice
+        implements Slice
+    {
+        public int val;
+
+        private transient boolean read;
+
+        public SingleValueSlice (int value)
+        {
+            this.val = value;
+        }
+
+        public void close ()
+            throws IOException
+        {}
+
+        public long getLength ()
+        {
+            return 1;
+        }
+
+        public String[] getLocations ()
+        {
+            return new String[0];
+        }
+
+        public long getPos ()
+            throws IOException
+        {
+            return read ? 1 : 0;
+        }
+
+        public float getProgress ()
+            throws IOException
+        {
+            return read ? 1 : 0;
+        }
+
+        public void init (DataStorage store)
+            throws IOException
+        {}
+
+        public boolean next (Tuple value)
+            throws IOException
+        {
+            if (!read) {
+                value.appendField(new DataAtom(val));
+                read = true;
+                return true;
+            }
+            return false;
+        }
+
+        private static final long serialVersionUID = 1L;
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestCustomSlicer.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,34 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class TestCustomSlicer extends TestCase{
+    /**
+     * Uses RangeSlicer in place of pig's default Slicer to generate a few
+     * values and count them.
+     */
+    @Test
+    public void testUseRangeSlicer() throws ExecException, IOException {
+        PigServer pig = new PigServer(MAPREDUCE);
+        int numvals = 50;
+        String query = "vals = foreach (group (load '"
+                + numvals
+                + "'using org.apache.pig.test.RangeSlicer()) all) generate COUNT($1);";
+        pig.registerQuery(query);
+        Iterator<Tuple> it = pig.openIterator("vals");
+        Tuple cur = it.next();
+        DataAtom val = cur.getAtomField(0);
+        assertEquals(numvals, (int) val.longVal());
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestParser.java?rev=644033&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestParser.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestParser.java Wed Apr  2 13:11:06 2008
@@ -0,0 +1,22 @@
+package org.apache.pig.test;
+
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+
+public class TestParser extends TestCase {
+
+    public void testLoadingNonexistentFile() throws ExecException, IOException {
+        PigServer pig = new PigServer(MAPREDUCE);
+        try {
+            pig.registerQuery("vals = load 'nonexistentfile';");
+            fail("Loading a  nonexistent file should throw an IOException at parse time");
+        } catch (IOException io) {
+        }
+    }
+}