You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/04/01 23:22:42 UTC

svn commit: r643583 - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache/pig/backend/hadoop/streaming/ src/org/apache/pig/bu...

Author: olga
Date: Tue Apr  1 14:22:38 2008
New Revision: 643583

URL: http://svn.apache.org/viewvc?rev=643583&view=rev
Log:
PIG-94: M3 of streaming

Added:
    incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
    incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Tue Apr  1 14:22:38 2008
@@ -195,3 +195,5 @@
 
 	PIG-122: Added build and src-gen to the list of ignore files in
 	the top level directory (joa23 via gates).
+
+    PIG-94: M3 code update for streaming

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Tue Apr  1 14:22:38 2008
@@ -51,6 +51,9 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.streaming.LoadOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.streaming.StoreOptimizer;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
@@ -275,7 +278,13 @@
         
         // Check if we just processed a LOStore i.e. STORE
         if (op instanceof LOStore) {
-            runQuery(lp);
+            try {
+                optimizeAndRunQuery(lp);
+            }
+            catch (ExecException e) {
+                throw WrappedIOException.wrap("Unable to store alias " + 
+                        lp.getAlias(), e);
+            }
         }
     }
       
@@ -306,12 +315,10 @@
         // execution.
         
         LogicalPlan readFrom = (LogicalPlan) aliases.get(id);
-
+        
+        // Run
         try {
-            ExecPhysicalPlan pp = 
-                pigContext.getExecutionEngine().compile(readFrom, null);
-            
-            ExecJob job = pigContext.getExecutionEngine().execute(pp);
+            ExecJob job = optimizeAndRunQuery(readFrom);
 
             // invocation of "execute" is synchronous!
             if (job.getStatus() == JOB_STATUS.COMPLETED) {
@@ -365,21 +372,36 @@
                                                               func,
                                                               pigContext);
 
-        runQuery(storePlan);
-    }
+        // Optimize 
+        Optimizer optimizer = new LoadOptimizer();
+        optimizer.optimize(readFrom);
 
-    private void runQuery(LogicalPlan storePlan) throws IOException {
-        try {
-            ExecPhysicalPlan pp = 
-                pigContext.getExecutionEngine().compile(storePlan, null);
 
-            pigContext.getExecutionEngine().execute(pp);
+        try {
+            optimizeAndRunQuery(storePlan);
         }
         catch (ExecException e) {
             throw WrappedIOException.wrap("Unable to store alias " + 
-                                          storePlan.getAlias(), e);
+                    storePlan.getAlias(), e);
+
         }
     }
+
+    private ExecJob optimizeAndRunQuery(LogicalPlan root) throws ExecException {
+        // Optimize the LogicalPlan
+        Optimizer loadOptimizer = new LoadOptimizer();
+        loadOptimizer.optimize(root);
+
+        Optimizer storeOptimizer = new StoreOptimizer();
+        storeOptimizer.optimize(root);
+
+        // Execute
+        ExecPhysicalPlan pp = 
+            pigContext.getExecutionEngine().compile(root, null);
+
+        return pigContext.getExecutionEngine().execute(pp);
+    }
+    
     /**
      * Provide information on how a pig query will be executed.  For now
      * this information is very developer focussed, and probably not very

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Tue Apr  1 14:22:38 2008
@@ -193,6 +193,8 @@
             FileSpec fileSpec = new FileSpec(filename, loLoad.getInputFileSpec().getFuncSpec());
             pom.addInputFile(fileSpec);
             pom.mapParallelism = Math.max(pom.mapParallelism, lo.getRequestedParallelism());
+            pom.setProperty("pig.input.splitable", 
+                            Boolean.toString(loLoad.isSplitable()));
             return pom.getOperatorKey();
         } 
         else if (lo instanceof LOStore) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Tue Apr  1 14:22:38 2008
@@ -259,6 +259,14 @@
         properties.putAll(spec.getProperties());
     }
     
+    public void setProperty(String key, String value) {
+        properties.setProperty(key, value);
+    }
+    
+    public String getProperty(String key) {
+        return properties.getProperty(key);
+    }
+    
     public void visit(POVisitor v) {
         v.visitMapreduce(this);
     }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Tue Apr  1 14:22:38 2008
@@ -49,6 +49,7 @@
 public class PigInputFormat implements InputFormat<Text, Tuple>, JobConfigurable {
 
     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+        boolean isSplitable = job.getBoolean("pig.input.splitable", false);
         
         ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer.deserialize(job.get("pig.inputs"));        
         ArrayList<EvalSpec> mapFuncs = (ArrayList<EvalSpec>) ObjectSerializer.deserialize(job.get("pig.mapFuncs",""));        
@@ -109,8 +110,9 @@
                 long size = fs.getFileStatus(fullPath).getLen();
                 long pos = 0;
                 String name = paths.get(j).getName();
-                if (name.endsWith(".gz")) {
-                    // Anything that ends with a ".gz" we must process as a complete file
+                if (name.endsWith(".gz") || !isSplitable) {
+                    // Anything that ends with a ".gz" or can't be split
+                    // we must process as a complete file
                     splits.add(new PigSplit(pigContext, fs, fullPath, parser, groupFuncs==null ? null : groupFuncs.get(i), mapFuncs.get(i), i, 0, size));
                 } else {
                     while (pos < size) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Tue Apr  1 14:22:38 2008
@@ -108,16 +108,23 @@
 
         setupMapPipe(properties, reporter);
 
-        // allocate key & value instances that are re-used for all entries
-        WritableComparable key = input.createKey();
-        Writable value = input.createValue();
-        while (input.next(key, value)) {
-            evalPipe.add((Tuple) value);
-        }
-        evalPipe.finishPipe();  // EOF marker
-        evalPipe= null;
-        if (pigWriter != null) {
-            pigWriter.close(reporter);
+        try {
+            // allocate key & value instances that are re-used for all entries
+            WritableComparable key = input.createKey();
+            Writable value = input.createValue();
+            while (input.next(key, value)) {
+                evalPipe.add((Tuple) value);
+            }
+        } finally {
+            try {
+                evalPipe.finishPipe();  // EOF marker
+                evalPipe = null;
+            } finally {
+                // Close the writer
+                if (pigWriter != null) {
+                    pigWriter.close(reporter);
+                }
+            }
         }
     }
 

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Tue Apr  1 14:22:38 2008
@@ -111,35 +111,37 @@
     }
 
     public void close() throws IOException, ExecException {
-        super.close();
-        
-        // Footer for stderr file of the task
-        writeDebugFooter();
-        
-        // Copy the secondary outputs of the task to HDFS
-        Path scriptOutputDir = new Path(this.scriptOutputDir);
-        FileSystem fs = scriptOutputDir.getFileSystem(job);
-        List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
-        if (outputSpecs != null) {
-            for (int i=1; i < outputSpecs.size(); ++i) {
-                String fileName = outputSpecs.get(i).getName();
-                try {
-                    fs.copyFromLocalFile(false, true, new Path(fileName), 
-                                         new Path(scriptOutputDir, 
-                                                  taskId+"-"+fileName)
-                                        );
-                } catch (IOException ioe) {
-                    System.err.println("Failed to save secondary output '" + 
-                                       fileName + "' of task: " + taskId +
-                                       " with " + ioe);
-                    throw new ExecException(ioe);
+        try {
+            super.close();
+
+            // Copy the secondary outputs of the task to HDFS
+            Path scriptOutputDir = new Path(this.scriptOutputDir);
+            FileSystem fs = scriptOutputDir.getFileSystem(job);
+            List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+            if (outputSpecs != null) {
+                for (int i=1; i < outputSpecs.size(); ++i) {
+                    String fileName = outputSpecs.get(i).getName();
+                    try {
+                        fs.copyFromLocalFile(false, true, new Path(fileName), 
+                                new Path(scriptOutputDir, 
+                                        taskId+"-"+fileName)
+                        );
+                    } catch (IOException ioe) {
+                        System.err.println("Failed to save secondary output '" + 
+                                           fileName + "' of task: " + taskId +
+                                           " with " + ioe);
+                        throw new ExecException(ioe);
+                    }
                 }
-            }
         }
-
-        // Close the stderr file on HDFS
-        if (errorStream != null) {
-            errorStream.close();
+        } finally {
+            // Footer for stderr file of the task
+            writeDebugFooter();
+            
+            // Close the stderr file on HDFS
+            if (errorStream != null) {
+                errorStream.close();
+            }
         }
     }
 
@@ -152,11 +154,14 @@
      *         HDFS, <code>false</code> otherwise
      */
     private boolean writeErrorToHDFS(int limit, String taskId) {
-        // These are hard-coded begin/end offsets a Hadoop *taskid*
-        int beginIndex = 25, endIndex = 31;   
-        
-        int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
-        return command.getPersistStderr() && tipId < command.getLogFilesLimit();
+        if (command.getPersistStderr()) {
+            // These are hard-coded begin/end offsets a Hadoop *taskid*
+            int beginIndex = 25, endIndex = 31;   
+
+            int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            return tipId < command.getLogFilesLimit();
+        }
+        return false;
     }
     
     protected void processError(String error) {
@@ -175,7 +180,12 @@
     private void writeDebugHeader() {
         processError("===== Task Information Header =====" );
 
-        processError("\nCommand: " + command.getExecutable());
+        StringBuffer sb = new StringBuffer();
+        for (String arg : command.getCommandArgs()) {
+            sb.append(arg);
+            sb.append(" ");
+        }
+        processError("\nCommand: " + sb.toString());
         processError("\nStart time: " + new Date(System.currentTimeMillis()));
         processError("\nInput-split file: " + job.get("map.input.file"));
         processError("\nInput-split start-offset: " + 
@@ -195,6 +205,7 @@
         List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
         HandleSpec inputSpec = 
             (inputSpecs != null) ? inputSpecs.get(0) : null;
+        processError("\nInput records: " + inputRecords);
         processError("\nInput bytes: " + inputBytes + " bytes " +
                     ((inputSpec != null) ? 
                             "(" + inputSpec.getName() + " using " + 
@@ -204,6 +215,7 @@
         List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
         HandleSpec outputSpec = 
             (outputSpecs != null) ? outputSpecs.get(0) : null;
+        processError("\nOutput records: " + outputRecords);
         processError("\nOutput bytes: " + outputBytes + " bytes " +
                      ((outputSpec != null) ? 
                          "(" + outputSpec.getName() + " using " + 

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Tue Apr  1 14:22:38 2008
@@ -95,4 +95,8 @@
         out.write(Tuple.RECORD_3);
         t.write(out);
     }
+
+    public boolean equals(Object obj) {
+        return true;
+    }
 }

Added: incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java Tue Apr  1 14:22:38 2008
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link BinaryStorage} is a simple, as-is, serializer/deserializer pair.
+ * 
+ * It is {@link LoadFunc} which loads all the given data from the given 
+ * {@link InputStream} into a single {@link Tuple} and a {@link StoreFunc}
+ * which writes out all input data as a single <code>Tuple</code>. 
+ * 
+ * <code>BinaryStorage</code> is intended to work in cases where input files
+ * are to be sent in-whole for processing without any splitting and 
+ * interpretation of their data.
+ */
+public class BinaryStorage implements LoadFunc, StoreFunc {
+    // LoadFunc
+    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
+    protected int bufferSize = DEFAULT_BUFFER_SIZE;
+
+    protected BufferedPositionedInputStream in = null;
+    protected long offset = 0;
+    protected long end = Long.MAX_VALUE;
+
+    // StoreFunc
+    OutputStream out;
+    
+    /**
+     * Create a <code>BinaryStorage</code> with default buffer size for reading
+     * inputs.
+     */
+    public BinaryStorage() {}
+    
+    /**
+     * Create a <code>BinaryStorage</code> with the given buffer-size for 
+     * reading inputs.
+     * 
+     * @param bufferSize buffer size to be used
+     */
+    public BinaryStorage(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+    
+    public void bindTo(String fileName, BufferedPositionedInputStream in,
+            long offset, long end) throws IOException {
+        this.in = in;
+        this.offset = offset;
+        this.end = end;
+    }
+
+    public Tuple getNext() throws IOException {
+        // Sanity check
+        if (in == null || in.getPosition() > end) {
+            return null;
+        }
+     
+        // Copy all data into the buffer
+        byte[] buffer = new byte[bufferSize];
+        int off = 0;
+        int len = bufferSize;
+        int n = 0;
+        while (len > 0 && (n = in.read(buffer, off, len)) != -1) {
+            off += n;
+            len -= n;
+        }
+        
+        if (n == -1) {
+            // Copy out the part-buffer and send it
+            byte[] copy = new byte[off];
+            System.arraycopy(buffer, 0, copy, 0, copy.length);
+            buffer = copy;
+        }
+
+        // Create a new Tuple with one DataAtom field and return it, 
+        // ensure that we return 'null' if we didn't get any data
+        if (off > 0) {
+            return new Tuple(new DataAtom(buffer));
+        }
+        
+        return null;
+    }
+
+    public void bindTo(OutputStream out) throws IOException {
+        this.out = out;
+    }
+
+    public void finish() throws IOException {}
+
+    public void putNext(Tuple f) throws IOException {
+        // Pick up the first field of the Tuple, then it's 
+        // raw-bytes and send it out
+        DataAtom dAtom = (DataAtom)(f.getAtomField(0));
+        byte[] data = dAtom.getValueBytes();
+        if (data.length > 0) {
+            out.write(dAtom.getValueBytes());
+            out.flush();
+        }
+    }
+    
+    public String toString() {
+        return "BinaryStorage(" + bufferSize + ")";
+    }
+    
+    public boolean equals(Object obj) {
+        return true;
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Tue Apr  1 14:22:38 2008
@@ -89,4 +89,12 @@
     public void finish() throws IOException {
     }
 
+    public boolean equals(Object obj) {
+        return equals((PigStorage)obj);
+    }
+
+    public boolean equals(PigStorage other) {
+        return this.fieldDel.equals(other.fieldDel);
+    }
+    
 }

Modified: incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/data/DataAtom.java Tue Apr  1 14:22:38 2008
@@ -20,11 +20,19 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.hadoop.io.WritableComparator;
 
 /**
- * The basic data unit. For now, we represent all atomic data objects as strings
+ * The basic data unit. 
+ * 
+ * We represent all atomic data objects as strings or raw-bytes.
  */
 final public class DataAtom extends Datum {
+    private enum Type {BINARY, STRING};
+    
+    Type type = Type.STRING;
     String stringVal = null;
     Double doubleVal = null;
     public static String EMPTY = "";
@@ -62,6 +70,8 @@
     
     public void setValue(byte[] valIn) {
         binaryVal = valIn;
+        type = Type.BINARY;
+        
         stringVal = null;
         doubleVal = Double.POSITIVE_INFINITY;
     }
@@ -85,6 +95,22 @@
         stringVal = Double.toString(valIn);
     }
 
+    public byte[] getValueBytes() {
+        byte[] data = null;
+        
+        if (type == Type.STRING) {
+            try {
+                data = stringVal.getBytes("UTF-8");
+            } catch (UnsupportedEncodingException uee) {
+                data = null;
+            }
+        } else {
+            data = binaryVal;
+        }
+
+        return data;
+    }
+    
     public String strval() {
         return stringVal;
     }
@@ -120,37 +146,52 @@
             return -1;
         DataAtom dOther = (DataAtom) other;
         
-        return stringVal.compareTo(dOther.stringVal);
+        return (type == Type.STRING ) ? stringVal.compareTo(dOther.stringVal) : 
+            WritableComparator.compareBytes(binaryVal, 0, binaryVal.length, 
+                                            dOther.binaryVal, 0, 
+                                            dOther.binaryVal.length);
             
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
          out.write(ATOM);
+         out.writeUTF(type.toString());
          byte[] data;
-         try {
-             data = strval().getBytes("UTF-8");
-         } catch (Exception e) {
-             long size = strval().length();
-             throw new RuntimeException("Error dealing with DataAtom of size " + size, e);
+         if (type == Type.BINARY) {
+             data = binaryVal;
+         } else { 
+             try {
+                 data = strval().getBytes("UTF-8");
+             } catch (Exception e) {
+                 long size = strval().length();
+                 throw new RuntimeException("Error dealing with DataAtom of size " + size, e);
+             }
          }
          Tuple.encodeInt(out, data.length);
          out.write(data);    
     }
     
     static DataAtom read(DataInput in) throws IOException {
+        Type type = Type.valueOf(in.readUTF());
         int len = Tuple.decodeInt(in);
         DataAtom ret = new DataAtom();
         byte[] data = new byte[len];
         in.readFully(data);
-        ret.setValue(new String(data, "UTF-8"));
+        if (type == Type.STRING) {
+            ret.setValue(new String(data, "UTF-8"));
+        }
+        else {
+            ret.setValue(data);
+        }
         return ret;
     }
 
     
     @Override
     public int hashCode() {
-        return stringVal.hashCode();
+        return (type == Type.STRING) ? stringVal.hashCode() : 
+            WritableComparator.hashBytes(binaryVal, binaryVal.length);
     }
 
     @Override

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Tue Apr  1 14:22:38 2008
@@ -68,16 +68,22 @@
         }
         properties.setProperty(property, sb.toString());        
     }
+
+    /**
+     * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
+     * @return
+     */
+    public StreamingCommand getCommand() {
+        return command;
+    }
     
-    @Override
     public List<String> getFuncs() {
         // No user-defined functions here
         return new ArrayList<String>();
     }
 
     protected Schema mapInputSchema(Schema schema) {
-        // EvalSpec _has_ to have the schema if there is one...
-        return null;
+        return schema;
     }
 
     protected DataCollector setupDefaultPipe(Properties properties,
@@ -120,7 +126,7 @@
                 // Start the executable
                 this.executableManager.run();
             } catch (Exception e) {
-                LOG.fatal("Failed to create/start PigExecutableManager with: " + 
+                LOG.fatal("Failed to create/start ExecutableManager with: " + 
                           e);
                 e.printStackTrace();
                 throw new RuntimeException(e);
@@ -142,7 +148,7 @@
                 executableManager.close();
             }
             catch (Exception e) {
-                LOG.fatal("Failed to close PigExecutableManager with: " + e);
+                LOG.fatal("Failed to close ExecutableManager with: " + e);
                 throw new RuntimeException(e);
             }
         }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/collector/DataCollector.java Tue Apr  1 14:22:38 2008
@@ -19,6 +19,8 @@
 
 import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
@@ -31,6 +33,9 @@
  * or a file.
  */
 public abstract class DataCollector {
+    private static final Log LOG = 
+        LogFactory.getLog(DataCollector.class.getName());
+    
     Integer staleCount = 0;
     protected boolean inTheMiddleOfBag = false;
     
@@ -121,10 +126,21 @@
         }
     }
     
-    public void finishPipe(){
-        finish();
-        if (successor!=null)
-            successor.finishPipe();
+    public final void finishPipe() {
+        try {
+            finish();
+        } finally {
+            try {
+                if (successor != null) {
+                    successor.finishPipe();
+                } 
+            } catch (Exception e) {
+                // Ignore this exception since the original is more relevant
+                LOG.debug(e);
+            } finally {
+                successor = null;
+            }
+        }
     }
     
     protected void finish(){}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Tue Apr  1 14:22:38 2008
@@ -39,14 +39,17 @@
     protected FileSpec inputFileSpec;
 
     protected int outputType = FIXED;
-
+    
+    protected boolean splitable = true;
 
     public LOLoad(Map<OperatorKey, LogicalOperator> opTable, 
                   String scope, 
                   long id, 
-                  FileSpec inputFileSpec) throws IOException, ParseException {
+                  FileSpec inputFileSpec, boolean splitable) 
+    throws IOException, ParseException {
         super(opTable, scope, id);
         this.inputFileSpec = inputFileSpec;
+        this.splitable = splitable;
         
         // check if we can instantiate load func
         LoadFunc storageFunc = (LoadFunc) PigContext
@@ -113,6 +116,10 @@
         return funcs;
     }
 
+    public boolean isSplitable() {
+        return splitable;
+    }
+    
     public void visit(LOVisitor v) {
         v.visitLoad(this);
     }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Tue Apr  1 14:22:38 2008
@@ -62,6 +62,9 @@
         return outputFileSpec;
     }
 
+    public void setOutputFileSpec(FileSpec spec) {
+        outputFileSpec = spec;
+    }
 
     @Override
     public String toString() {

Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/Optimizer.java Tue Apr  1 14:22:38 2008
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+
+/**
+ * {@link Optimizer} is a simple {@link LogicalPlan} optimizer.
+ * 
+ * It <em>visits</em> every node in the <code>LogicalPlan</code> and then
+ * optimizes the <code>LogicalPlan</code>.
+ */
+public abstract class Optimizer extends LOVisitor {
+    /**
+     * Optimize the given {@link LogicalPlan} if feasible and return status.
+     * 
+     * @param root root of the {@link LogicalPlan} to optimize
+     * @return <code>true</code> if optimization was feasible and was effected,
+     *         <code>false</code> otherwise.
+     */
+    abstract public boolean optimize(LogicalPlan root);
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Tue Apr  1 14:22:38 2008
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer.streaming;
+
+import java.util.List;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.BinaryStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StreamSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link LoadOptimizer} tries to optimize away the deserialization done by Pig 
+ * for the simple case of a LOAD followed by a STREAM operator; both with the 
+ * equivalent {@link LoadFunc} specifications.
+ * 
+ * In such cases it is safe to replace the equivalent <code>LoadFunc</code>
+ * specifications with a {@link BinaryStorage} which doesn't interpret the
+ * input bytes at all.
+ */
+public class LoadOptimizer extends Optimizer {
+    boolean optimize = false;
+    boolean parentLoad = false;
+    LOLoad load = null;
+    
+    public void visitCogroup(LOCogroup g) {
+        super.visitCogroup(g);
+        parentLoad = false;
+    }
+
+    public void visitEval(LOEval e) {
+        super.visitEval(e);
+
+        if (parentLoad) {
+            EvalSpec spec = e.getSpec();
+            if (spec instanceof StreamSpec && !load.isSplitable()) {
+                // Try and optimize if the load and stream input specs match
+                // and input files are to be processed as-is
+                StreamSpec streamSpec = (StreamSpec)spec;
+                StreamingCommand command = streamSpec.getCommand();
+                List<HandleSpec> inputSpecs = 
+                    command.getHandleSpecs(Handle.INPUT); 
+                HandleSpec streamInputSpec = 
+                    (inputSpecs == null) ? 
+                            new HandleSpec("stdin" , "PigStorage()") :
+                            inputSpecs.get(0);
+                
+                FileSpec loadFileSpec = load.getInputFileSpec();
+                
+                // Instantiate both LoadFunc objects to compare them for 
+                // equality
+                LoadFunc streamLoader = 
+                    (LoadFunc)PigContext.instantiateFuncFromSpec(
+                            streamInputSpec.getSpec());
+                
+                LoadFunc inputLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(
+                                                loadFileSpec.getFuncSpec());
+                
+                if (streamLoader.equals(inputLoader)) {
+                    // Since they both are the same, we can flip them 
+                    // for BinaryStorage
+                    load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
+                    streamInputSpec.setSpec(BinaryStorage.class.getName());
+                    
+                    optimize = true;
+                }
+            }
+        }
+        
+        parentLoad = false;
+    }
+
+    public void visitLoad(LOLoad load) {
+        super.visitLoad(load);
+        parentLoad = true;
+        this.load = load;
+    }
+
+    public void visitSort(LOSort s) {
+        super.visitSort(s);
+        parentLoad = false;
+    }
+
+    public void visitSplit(LOSplit s) {
+        super.visitSplit(s);
+        parentLoad = false;
+    }
+
+    public void visitSplitOutput(LOSplitOutput s) {
+        super.visitSplitOutput(s);
+        parentLoad = false;
+    }
+
+    public void visitStore(LOStore s) {
+        super.visitStore(s);
+        parentLoad = false;
+    }
+
+    public void visitUnion(LOUnion u) {
+        super.visitUnion(u);
+        parentLoad = false;
+    }
+
+    public boolean optimize(LogicalPlan root) {
+        LogicalOperator r = root.getOpTable().get(root.getRoot());
+        r.visit(this);
+        return optimize;
+    }
+}

Added: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Tue Apr  1 14:22:38 2008
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer.optimizer.streaming;
+
+import java.util.List;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.BinaryStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.StreamSpec;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOEval;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link StoreOptimizer} tries to optimize away the deserialization done by Pig 
+ * for the simple case of a STREAM followed by a STORE operator; both with
+ * equivalent {@link StoreFunc} specifications.
+ * 
+ * In such cases it is safe to replace the <code>StoreFunc</code>
+ * specifications with a {@link BinaryStorage} which doesn't interpret the
+ * output bytes at all.
+ */
+public class StoreOptimizer extends Optimizer {
+    boolean optimize = false;
+    boolean parentEval = false;
+    LOEval eval = null;
+    
+    public void visitCogroup(LOCogroup g) {
+        super.visitCogroup(g);
+        parentEval = false;
+    }
+
+    public void visitEval(LOEval e) {
+        super.visitEval(e);
+        eval = e;
+        parentEval = true;
+    }
+
+    public void visitLoad(LOLoad load) {
+        super.visitLoad(load);
+        parentEval = false;
+    }
+
+    public void visitSort(LOSort s) {
+        super.visitSort(s);
+        parentEval = false;
+    }
+
+    public void visitSplit(LOSplit s) {
+        super.visitSplit(s);
+        parentEval = false;
+    }
+
+    public void visitSplitOutput(LOSplitOutput s) {
+        super.visitSplitOutput(s);
+        parentEval = false;
+    }
+
+    public void visitStore(LOStore s) {
+        super.visitStore(s);
+        
+        if (parentEval) {
+            EvalSpec spec = eval.getSpec();
+            if (spec instanceof StreamSpec) {
+                // Try and optimize if the store and stream output specs match
+                StreamSpec streamSpec = (StreamSpec)spec;
+                StreamingCommand command = streamSpec.getCommand();
+                List<HandleSpec> outputSpecs = 
+                    command.getHandleSpecs(Handle.OUTPUT); 
+                HandleSpec streamOutputSpec = 
+                    (outputSpecs == null) ? 
+                            new HandleSpec("stdout" , "PigStorage()") :
+                            outputSpecs.get(0);
+                
+                FileSpec storeFileSpec = s.getOutputFileSpec();
+                
+                // Instantiate both StoreFunc objects to compare them for 
+                // equality
+                StoreFunc streamStorer = 
+                    (StoreFunc)PigContext.instantiateFuncFromSpec(
+                            streamOutputSpec.getSpec());
+                
+                StoreFunc outputStorer = (StoreFunc)PigContext.instantiateFuncFromSpec(
+                                                storeFileSpec.getFuncSpec());
+                
+                if (streamStorer.equals(outputStorer)) {
+                    // Since they both are the same, we can flip them 
+                    // for BinaryStorage
+                    s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
+                    streamOutputSpec.setSpec(BinaryStorage.class.getName());
+                    
+                    optimize = true;
+                }
+            }
+        }
+
+        parentEval = false;
+    }
+
+    public void visitUnion(LOUnion u) {
+        super.visitUnion(u);
+        parentEval = false;
+    }
+
+    public boolean optimize(LogicalPlan root) {
+        LogicalOperator r = root.getOpTable().get(root.getRoot());
+        r.visit(this);
+        return optimize;
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Apr  1 14:22:38 2008
@@ -662,8 +662,8 @@
 	{return op;}
 }
 
-LogicalOperator LoadClause() : {Token t1, t2; String filename; String funcName,funcArgs, funcSpec=null; 
-								LOLoad lo=null; boolean continuous=false;}
+LogicalOperator LoadClause() : {Token t1, t2, t3; String filename; String funcName,funcArgs, funcSpec=null; 
+								LOLoad lo=null; boolean continuous=false; String splitBy; boolean splitable = true;}
 {
 	(	filename = FileName()
 		(
@@ -672,6 +672,15 @@
 			funcSpec = funcName + "(" + funcArgs + ")";
 		}
 		)?
+		(
+		<SPLIT> <BY> t3 = <QUOTEDSTRING>
+		{
+			splitBy = unquote(t3.image);
+			if (splitBy.equalsIgnoreCase("file")) {
+				splitable = false;
+			}
+		}
+		)?
 	)
 	[ <CONTINUOUSLY> {continuous=true;} ] 
 	{
@@ -680,22 +689,33 @@
 			funcSpec += continuous ? "('\t','\n','0')" : "()";
 		}
 		 
-		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec));	
+		lo = new LOLoad(opTable, scope, getNextId(), new FileSpec(massageFilename(filename, pigContext, true), funcSpec), splitable);	
 		if (continuous)
 			lo.setOutputType(LogicalOperator.MONOTONE);
 		return lo;
 	} 
 }    
 
-String StringList() : {StringBuilder sb = new StringBuilder(); Token t;}
+String StringList() : {StringBuilder sb = new StringBuilder(); Token t; String arg;}
 {
 	(
+	LOOKAHEAD(2)
 	(
 	t = <QUOTEDSTRING> {sb.append(t.image);}
-	( "," t = <QUOTEDSTRING> {sb.append(",");sb.append(t.image);} )*
-	)
-	| {}
-	)
+	| 
+    t = <NUMBER> {sb.append(t.image);}
+    )
+    ( 
+        "," 
+        (
+        t = <QUOTEDSTRING> {sb.append(t.image);}
+        | 
+        t = <NUMBER> {sb.append(t.image);}
+        )
+    )*
+	| 
+	{}
+    )
 	{return sb.toString();}
 }
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Apr  1 14:22:38 2008
@@ -33,6 +33,7 @@
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.streaming.InputHandler.InputType;
 import org.apache.pig.impl.streaming.OutputHandler.OutputType;
 
@@ -74,11 +75,29 @@
 
 	Properties properties;
 
+	// Statistics
+	protected long inputRecords = 0;
 	protected long inputBytes = 0;
+	protected long outputRecords = 0;
 	protected long outputBytes = 0;
 	
+	/**
+	 * Create a new {@link ExecutableManager}.
+	 */
 	public ExecutableManager() {}
 	
+	/**
+	 * Configure and initialize the {@link ExecutableManager}.
+	 * 
+	 * @param properties {@link Properties} for the 
+	 *                   <code>ExecutableManager</code>
+	 * @param command {@link StreamingCommand} to be run by the 
+	 *                <code>ExecutableManager</code>
+	 * @param endOfPipe {@link DataCollector} to be used to push results of the
+	 *                  <code>StreamingCommand</code> down
+	 * @throws IOException
+	 * @throws ExecException
+	 */
 	public void configure(Properties properties, StreamingCommand command, 
 	                      DataCollector endOfPipe) 
 	throws IOException, ExecException {
@@ -96,51 +115,82 @@
 		this.endOfPipe = endOfPipe;
 	}
 	
+	/**
+	 * Close and cleanup the {@link ExecutableManager}.
+	 * 
+	 * @throws IOException
+	 * @throws ExecException
+	 */
 	public void close() throws IOException, ExecException {
-	    // Close the InputHandler, which in some cases lets the process
-	    // terminate
-		inputHandler.close();
-		
-		// Check if we need to start the process now ...
-		if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
-		    exec();
-		}
-		
-		// Wait for the process to exit and the stdout/stderr threads to complete
-		try {
-			exitCode = process.waitFor();
-			
-			if (stdoutThread != null) {
-			    stdoutThread.join(0);
-			}
-			if (stderrThread != null) {
-				stderrThread.join(0);
-			}
+	    try {
+	        // Close the InputHandler, which in some cases lets the process
+	        // terminate
+	        inputHandler.close();
+
+	        // Check if we need to start the process now ...
+	        if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+	            exec();
+	        }
 
-		} catch (InterruptedException ie) {}
+	        // Wait for the process to exit 
+	        try {
+	            exitCode = process.waitFor();
+	        } catch (InterruptedException ie) {}
+
+	        // Wait for stdout thread to complete
+	        try {
+	            if (stdoutThread != null) {
+	                stdoutThread.join(0);
+	            }
+	            stdoutThread = null;
+	        } catch (InterruptedException ie) {}
+	        
+            // Wait for stderr thread to complete
+	        try {
+	            if (stderrThread != null) {
+	                stderrThread.join(0);
+	            }
+	            stderrThread = null;
+	        } catch (InterruptedException ie) {}
+
+	        // Clean up the process
+	        process.destroy();
+	        process = null;
+
+	        LOG.debug("Process exited with: " + exitCode);
+	        if (exitCode != SUCCESS) {
+	            throw new ExecException(command + " failed with exit status: " + 
+	                    exitCode);
+	        }
 
-		// Clean up the process
-		process.destroy();
-		
-        LOG.debug("Process exited with: " + exitCode);
-        if (exitCode != SUCCESS) {
-            throw new ExecException(command + " failed with exit status: " + 
-                                       exitCode);
-        }
-        
-        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
-            // Trigger the outputHandler
-            outputHandler.bindTo(null);
+	        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+	            // Trigger the outputHandler
+	            outputHandler.bindTo("", null, 0, -1);
+
+	            // Start the thread to process the output and wait for
+	            // it to terminate
+	            stdoutThread = new ProcessOutputThread(outputHandler);
+	            stdoutThread.start();
+
+	            try {
+	                stdoutThread.join(0);
+	            } catch (InterruptedException ie) {}
+	        }
+	    } finally {
+	        // Cleanup, release resources ...
+	        if (process != null) {
+	            process.destroy();
+	        }
 
-            // Start the thread to process the output and wait for
-            // it to terminate
-            stdoutThread = new ProcessOutputThread(outputHandler);
-            stdoutThread.start();
-            
-            try {
-                stdoutThread.join(0);
-            } catch (InterruptedException ie) {}
-        }
+	        if (stdoutThread != null) {
+	            stdoutThread.interrupt();
+	        }
+	    
+	        if (stderrThread != null) {
+	            stderrThread.interrupt();
+	        }
+	    }
+	    
 
 	}
 
@@ -172,7 +222,8 @@
                 new DataInputStream(new BufferedInputStream(process.getInputStream()));
             
             // Bind the stdout to the OutputHandler
-            outputHandler.bindTo(stdout);
+            outputHandler.bindTo("", new BufferedPositionedInputStream(stdout), 
+                                 0, Long.MAX_VALUE);
             
             // Start the thread to process the executable's stdout
             stdoutThread = new ProcessOutputThread(outputHandler);
@@ -180,6 +231,11 @@
         }
 	}
 	
+	/**
+	 * Start execution of the {@link ExecutableManager}.
+	 * 
+	 * @throws IOException
+	 */
 	public void run() throws IOException {
 	    // Check if we need to exec the process NOW ...
 	    if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
@@ -193,11 +249,19 @@
 	    inputHandler.bindTo(stdin);
 	}
 
+	/**
+	 * Send the given {@link Datum} to the external command managed by the
+	 * {@link ExecutableManager}.
+	 * 
+	 * @param d <code>Datum</code> to be sent to the external command
+	 * @throws IOException
+	 */
 	public void add(Datum d) throws IOException {
 		// Pass the serialized tuple to the executable via the InputHandler
 	    Tuple t = (Tuple)d;
 	    inputHandler.putNext(t);
 	    inputBytes += t.getMemorySize();
+	    inputRecords++;
 	}
 
 	/**
@@ -210,6 +274,7 @@
 	 */
 	protected void processOutput(Datum d) {
 		endOfPipe.add(d);
+		outputRecords++;
 	}
 	
 	class ProcessOutputThread extends Thread {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/FileOutputHandler.java Tue Apr  1 14:22:38 2008
@@ -20,11 +20,11 @@
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
 
 /**
@@ -35,7 +35,7 @@
 public class FileOutputHandler extends OutputHandler {
 
     String fileName;
-    InputStream fileInStream;
+    BufferedPositionedInputStream fileInStream;
     
     public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
         fileName = handleSpec.name;
@@ -47,10 +47,15 @@
         return OutputType.ASYNCHRONOUS;
     }
     
-    public void bindTo(InputStream is) throws IOException {
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+            long offset, long end) throws IOException {
         // This is a trigger to start processing the output from the file ...
-        fileInStream = new FileInputStream(new File(fileName)); 
-        super.bindTo(fileInStream);
+        // ... however, we must ignore the input parameters and use ones
+        // provided during initialization
+        File file = new File(this.fileName);
+        this.fileInStream = 
+            new BufferedPositionedInputStream(new FileInputStream(file)); 
+        super.bindTo(this.fileName, this.fileInStream, 0, file.length());
     }
     
     public void close() throws IOException {

Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/OutputHandler.java Tue Apr  1 14:22:38 2008
@@ -18,7 +18,6 @@
 package org.apache.pig.impl.streaming;
 
 import java.io.IOException;
-import java.io.InputStream;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.data.Tuple;
@@ -58,9 +57,10 @@
      *           of the managed process
      * @throws IOException
      */
-    public void bindTo(InputStream is) throws IOException {
-        deserializer.bindTo("", new BufferedPositionedInputStream(is), 0, 
-                            Long.MAX_VALUE);
+    public void bindTo(String fileName, BufferedPositionedInputStream is,
+                       long offset, long end) throws IOException {
+        deserializer.bindTo(fileName, new BufferedPositionedInputStream(is), 
+                            offset, end);
     }
     
     /**

Added: incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java?rev=643583&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestBinaryStorage.java Tue Apr  1 14:22:38 2008
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestBinaryStorage extends TestCase {
+    private static final String simpleEchoStreamingCommand = 
+        "perl -ne 'print \"$_\"'";
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    private static final int MAX_DATA_SIZE = 1024;
+    
+    @Test
+    public void testBinaryStorageWithAsciiData() throws Exception {
+        // Create input file with ascii data
+        File input = Util.createInputFile("tmp", "", 
+                new String[] {"A,1", "B,2", "C,3", "D,2",
+                              "A,5", "B,5", "C,8", "A,8",
+                              "D,8", "A,9"});
+        
+        // Test if data is handled correctly by BinaryStorage
+        testBinaryStorage(input);
+    }
+    
+    @Test
+    public void testBinaryStorageWithBinaryData() throws Exception {
+        // Create input file and fill it with random binary data
+        File input = File.createTempFile("tmp", "dat");
+        byte[] data = new byte[MAX_DATA_SIZE];
+        randomizeBytes(data, 0, data.length);
+        
+        FileOutputStream os = new FileOutputStream(input);
+        os.write(data, 0, data.length);
+        os.close();
+        
+        // Test if data is handled correctly by BinaryStorage
+        testBinaryStorage(input);
+    }
+    
+    private void testBinaryStorage(File input) 
+    throws Exception {
+        PigServer pigServer = new PigServer(ExecType.MAPREDUCE);
+
+        // Get input data 
+        byte[] inputData = new byte[MAX_DATA_SIZE];
+        int inputLen = 
+            readFileDataIntoBuffer(new FileInputStream(input), inputData);
+        
+        // Pig query to run
+        pigServer.registerQuery("DEFINE CMD `" + 
+                                simpleEchoStreamingCommand + "` " +
+                                "input(stdin using BinaryStorage()) " +
+                                "output(stdout using BinaryStorage());");
+        pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
+                                "BinaryStorage() split by 'file';");
+        pigServer.registerQuery("OP = stream IP through CMD;");
+
+        // Save the output using BinaryStorage
+        String output = "/pig/out";
+        pigServer.store("OP", output, "BinaryStorage()");
+        
+        // Get output data 
+        InputStream out = FileLocalizer.open(output, pigServer.getPigContext());
+        byte[] outputData = new byte[MAX_DATA_SIZE];
+        int outputLen = readFileDataIntoBuffer(out, outputData);
+
+        // Check if the data is the same ...
+        assertEquals(true, 
+                WritableComparator.compareBytes(inputData, 0, inputLen, 
+                                                outputData, 0, outputLen) == 0);
+        
+        // Cleanup
+        pigServer.deleteFile(output);
+    }
+
+    private static void randomizeBytes(byte[] data, int offset, int length) {
+        Random random = new Random();
+        for(int i=offset + length - 1; i >= offset; --i) {
+            data[i] = (byte) random.nextInt(256);
+        }
+    }
+
+    private static int readFileDataIntoBuffer(InputStream is, byte[] buffer) 
+    throws IOException {
+        int n = 0;
+        int off = 0, len = buffer.length;
+        while (len > 0 && (n = is.read(buffer, off, len)) != -1) {
+            off += n;
+            len -= n;
+        }
+        return off;
+    }
+}

Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=643583&r1=643582&r2=643583&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Tue Apr  1 14:22:38 2008
@@ -26,6 +26,7 @@
 import org.junit.Test;
 
 import org.apache.pig.PigServer;
+import org.apache.pig.PigServer.ExecType;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
@@ -56,8 +57,18 @@
 	}
 	
 	@Test
-	public void testSimpleMapSideStreaming() throws Exception {
-		PigServer pigServer = new PigServer(MAPREDUCE);
+	public void testLocalSimpleMapSideStreaming() throws Exception {
+	    testSimpleMapSideStreaming(ExecType.LOCAL);
+	}
+	
+	@Test
+    public void testMRSimpleMapSideStreaming() throws Exception {
+        testSimpleMapSideStreaming(ExecType.MAPREDUCE);
+    }
+    
+	private void testSimpleMapSideStreaming(ExecType execType) 
+	throws Exception {
+		PigServer pigServer = new PigServer(execType);
 
 		File input = Util.createInputFile("tmp", "", 
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -74,7 +85,9 @@
 		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
 		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
-		pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+        pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+		pigServer.registerQuery("OP = stream S1 through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
@@ -82,8 +95,20 @@
 	}
 
 	@Test
-	public void testSimpleMapSideStreamingWithOutputSchema() throws Exception {
-		PigServer pigServer = new PigServer(MAPREDUCE);
+    public void testLocalSimpleMapSideStreamingWithOutputSchema() 
+	throws Exception {
+	    testSimpleMapSideStreamingWithOutputSchema(ExecType.LOCAL);
+	}
+	
+    @Test
+    public void testMRSimpleMapSideStreamingWithOutputSchema() 
+    throws Exception {
+        testSimpleMapSideStreamingWithOutputSchema(ExecType.MAPREDUCE);
+    }
+    
+	private void testSimpleMapSideStreamingWithOutputSchema(ExecType execType) 
+	throws Exception {
+		PigServer pigServer = new PigServer(execType);
 
 		File input = Util.createInputFile("tmp", "", 
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -109,8 +134,20 @@
 	}
 
 	@Test
-	public void testSimpleReduceSideStreamingAfterFlatten() throws Exception {
-		PigServer pigServer = new PigServer(MAPREDUCE);
+    public void testLocalSimpleReduceSideStreamingAfterFlatten() 
+	throws Exception {
+	    testSimpleReduceSideStreamingAfterFlatten(ExecType.LOCAL);
+	}
+	
+    @Test
+    public void testMRSimpleReduceSideStreamingAfterFlatten() 
+    throws Exception {
+        testSimpleReduceSideStreamingAfterFlatten(ExecType.MAPREDUCE);
+    }
+    
+	private void testSimpleReduceSideStreamingAfterFlatten(ExecType execType) 
+	throws Exception {
+		PigServer pigServer = new PigServer(execType);
 
 		File input = Util.createInputFile("tmp", "", 
 				                          new String[] {"A,1", "B,2", "C,3", "D,2",
@@ -130,7 +167,9 @@
 		pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
 		pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
 				                "generate flatten($1);");
-		pigServer.registerQuery("OP = stream FLATTENED_GROUPED_DATA through `" +
+        pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+		pigServer.registerQuery("OP = stream S1 through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
@@ -138,8 +177,20 @@
 	}
 
 	@Test
-	public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
-		PigServer pigServer = new PigServer(MAPREDUCE);
+    public void testLocalSimpleOrderedReduceSideStreamingAfterFlatten() 
+	throws Exception {
+	    testSimpleOrderedReduceSideStreamingAfterFlatten(ExecType.LOCAL);
+	}
+	
+    @Test
+    public void testMRSimpleOrderedReduceSideStreamingAfterFlatten() 
+    throws Exception {
+        testSimpleOrderedReduceSideStreamingAfterFlatten(ExecType.MAPREDUCE);
+    }
+    
+	private void testSimpleOrderedReduceSideStreamingAfterFlatten(
+	        ExecType execType) throws Exception {
+		PigServer pigServer = new PigServer(execType);
 
 		File input = Util.createInputFile("tmp", "", 
 				                          new String[] {"A,1,2,3", "B,2,4,5",
@@ -169,12 +220,18 @@
 		pigServer.registerQuery("IP = load 'file:" + input + "' using " + 
 				                PigStorage.class.getName() + "(',');");
 		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("S2 = stream S1 through `" +
+                                simpleEchoStreamingCommand + "`;");
 		pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
 		pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
 				                "  D = order IP BY $2, $3;" +
                                 "  generate flatten(D);" +
                                 "};");
-		pigServer.registerQuery("OP = stream ORDERED_DATA through `" +
+        pigServer.registerQuery("S3 = stream ORDERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+		pigServer.registerQuery("OP = stream S3 through `" +
 				                simpleEchoStreamingCommand + "`;");
 		
 		// Run the query and check the results
@@ -224,6 +281,7 @@
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
 
         String output = "/pig/out";
+        pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
         InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
@@ -237,9 +295,6 @@
 
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-        
-        // Cleanup
-        pigServer.deleteFile(output);
     }
 
     @Test
@@ -286,6 +341,7 @@
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
         
         String output = "/pig/out";
+        pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
         InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
@@ -299,9 +355,6 @@
 
         // Run the query and check the results
         Util.checkQueryOutputs(outputs.iterator(), expectedResults);
-	
-        // Cleanup
-        pigServer.deleteFile(output);
     }
 
     @Test
@@ -351,6 +404,7 @@
         pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
         
         String output = "/pig/out";
+        pigServer.deleteFile(output);
         pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
         
         InputStream op = FileLocalizer.open(output, pigServer.getPigContext());