You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/02/27 01:52:08 UTC

svn commit: r631443 [5/10] - in /incubator/pig/branches/types: ./ lib-src/bzip2/org/apache/tools/bzip2r/ lib-src/shock/org/apache/pig/shock/ lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/backend/ src/org/apache/pig/backend/datastorage/ src/org/a...

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.executionengine.ExecLogicalPlan;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecScopedLogicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.io.FileSpec;
+import java.util.Iterator;
+
+
+public class LocalExecutionEngine implements ExecutionEngine {
+
+    protected PigContext pigContext;
+    protected DataStorage ds;
+    protected NodeIdGenerator nodeIdGenerator;
+
+    // key: the operator key from the logical plan that originated the physical plan
+    // val: the operator key for the root of the phyisical plan
+    protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+    
+    protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
+    
+    // map from LOGICAL key to into about the execution
+    protected Map<OperatorKey, LocalResult> materializedResults;
+    
+    public LocalExecutionEngine(PigContext pigContext) {
+        this.pigContext = pigContext;
+        this.ds = pigContext.getLfs();
+        this.nodeIdGenerator = NodeIdGenerator.getGenerator(); 
+        this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+        this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
+        this.materializedResults = new HashMap<OperatorKey, LocalResult>();
+    }
+
+    public DataStorage getDataStorage() {
+        return this.ds;
+    }
+    
+    public void init() throws ExecException {
+        ;
+    }
+
+    public void close() throws ExecException {
+        ;
+    }
+        
+    public Properties getConfiguration() throws ExecException {
+        Properties conf = new Properties();
+        return conf;
+    }
+        
+    public void updateConfiguration(Properties newConfiguration) 
+        throws ExecException {
+        ;
+    }
+        
+    public Map<String, Object> getStatistics() throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+
+    
+    public LocalPhysicalPlan compile(ExecLogicalPlan plan,
+                                     Properties properties)
+            throws ExecException {
+        if (plan == null) {
+            throw new ExecException("No Plan to compile");
+        }
+
+        return compile(new ExecLogicalPlan[]{ plan } , properties);
+    }
+
+    public LocalPhysicalPlan compile(ExecLogicalPlan[] plans,
+                                     Properties properties)
+            throws ExecException {
+        if (plans == null) {
+            throw new ExecException("No Plans to compile");
+        }
+
+        OperatorKey physicalKey = null;
+        for (int i = 0; i < plans.length; ++i) {
+            ExecLogicalPlan curPlan = null;
+
+            curPlan = plans[ i ];
+     
+            OperatorKey logicalKey = curPlan.getRoot();
+            
+            physicalKey = logicalToPhysicalKeys.get(logicalKey);
+            
+            if (physicalKey == null) {
+                physicalKey = doCompile(curPlan.getRoot(),
+                                        curPlan.getOpTable(),
+                                        properties);
+                
+                logicalToPhysicalKeys.put(logicalKey, physicalKey);
+            }
+        }
+        
+        return new LocalPhysicalPlan(physicalKey, physicalOpTable);
+    }
+
+    public LocalJob execute(ExecPhysicalPlan plan) throws ExecException {
+        DataBag results = BagFactory.getInstance().newDefaultBag();
+        try {
+            PhysicalOperator pp = (PhysicalOperator)physicalOpTable.get(plan.getRoot());
+
+            pp.open();
+            
+            Tuple t;
+            while ((t = (Tuple) pp.getNext()) != null) {
+                results.add(t);
+            }
+            
+            pp.close();
+        }
+        catch (IOException e) {
+            throw new ExecException(e);
+        }
+        
+        return new LocalJob(results, JOB_STATUS.COMPLETED);
+    }
+
+    public LocalJob submit(ExecPhysicalPlan plan) throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+
+    public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+        return new HashSet<ExecJob>();
+    }
+    
+    public Collection<String> activeScopes() throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void reclaimScope(String scope) throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    private OperatorKey doCompile(OperatorKey logicalKey,
+                                  Map<OperatorKey, LogicalOperator> logicalOpTable,
+                                  Properties properties) 
+            throws ExecException {
+        
+        LocalResult materializedResult = materializedResults.get(logicalKey);
+        
+        if (materializedResult != null) {
+            ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
+                                             nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+                                             physicalOpTable,
+                                             pigContext, 
+                                             materializedResult.outFileSpec,
+                                             LogicalOperator.FIXED);
+            
+            OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
+            
+            return ppKey;
+        }
+
+        OperatorKey physicalKey = new OperatorKey();
+        
+        if (compileOperator(logicalKey, logicalOpTable, properties, physicalKey)) {
+            for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); ++i) {
+                ((PhysicalOperator)physicalOpTable.get(physicalKey)).inputs[i] = 
+                    doCompile(logicalOpTable.get(logicalKey).getInputs().get(i), logicalOpTable, properties);
+            }
+        }
+
+        return physicalKey;
+    }
+    
+    private boolean compileOperator(OperatorKey logicalKey, 
+                                    Map<OperatorKey, LogicalOperator> logicalOpTable,
+                                    Properties properties,
+                                    OperatorKey physicalKey) 
+            throws ExecException {
+        ExecPhysicalOperator pp;
+        LogicalOperator lo = logicalOpTable.get(logicalKey);
+        String scope = lo.getScope();
+        boolean compileInputs = true;
+        
+        if (lo instanceof LOEval) {
+            
+            pp = new POEval(scope,
+                           nodeIdGenerator.getNextNodeId(scope),
+                           physicalOpTable,
+                           ((LOEval) lo).getSpec(),
+                           lo.getOutputType());
+        } 
+        else if (lo instanceof LOCogroup) {
+            pp = new POCogroup(scope,
+                               nodeIdGenerator.getNextNodeId(scope),
+                               physicalOpTable,
+                               ((LOCogroup) lo).getSpecs(),
+                               lo.getOutputType());
+        }  
+        else if (lo instanceof LOLoad) {
+            pp = new POLoad(scope,
+                            nodeIdGenerator.getNextNodeId(scope),
+                            physicalOpTable,
+                            pigContext, 
+                            ((LOLoad)lo).getInputFileSpec(),
+                            lo.getOutputType());
+        }
+        else if (lo instanceof LOSplitOutput) {
+            LOSplitOutput loso = (LOSplitOutput)lo;
+            LOSplit los = ((LOSplit)(logicalOpTable.get(loso.getInputs().get(0))));
+            
+            pp = new POSplit(scope,
+                             nodeIdGenerator.getNextNodeId(scope),
+                             physicalOpTable,
+                             doCompile(los.getInputs().get(0),
+                                       logicalOpTable,
+                                       properties), 
+                             los.getConditions(),
+                             loso.getReadFrom(),
+                             lo.getOutputType());
+            
+            compileInputs = false;
+        }
+        else if (lo instanceof LOStore) {
+            pp = new POStore(scope,
+                             nodeIdGenerator.getNextNodeId(scope),
+                             physicalOpTable,
+                             lo.getInputs().get(0),
+                             materializedResults,
+                             ((LOStore)lo).getOutputFileSpec(),
+                             ((LOStore)lo).isAppend(),
+                             pigContext);
+        } 
+        else if (lo instanceof LOUnion) {
+            pp = new POUnion(scope,
+                             nodeIdGenerator.getNextNodeId(scope),
+                             physicalOpTable,
+                             ((LOUnion)lo).getInputs().size(),
+                             lo.getOutputType());
+        } 
+        else if (lo instanceof LOSort) {
+            pp = new POSort(scope,
+                            nodeIdGenerator.getNextNodeId(scope),
+                            physicalOpTable,
+                            ((LOSort)lo).getSortSpec(),
+                            lo.getOutputType());
+        }
+        else {
+            throw new ExecException("Internal error: Unknown logical operator.");
+        }
+        
+        physicalKey.scope = pp.getScope();
+        physicalKey.id = pp.getId();
+        
+        logicalToPhysicalKeys.put(logicalKey, physicalKey);
+        
+        return compileInputs;
+    }
+}
+
+

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+
+public class LocalJob implements ExecJob {
+
+    protected DataBag results;
+    protected JOB_STATUS status;
+    
+    public LocalJob(DataBag results, JOB_STATUS status) {
+        this.results = results;
+        this.status = status;
+    }
+    
+    public JOB_STATUS getStatus() {
+        return status;
+    }
+    
+    public boolean hasCompleted() throws ExecException {
+        return true;
+    }
+    
+    public Iterator<Tuple> getResults() throws ExecException {
+        return this.results.iterator();
+    }
+
+    public Properties getContiguration() {
+        Properties props = new Properties();
+        return props;
+    }
+
+    public Map<String, Object> getStatistics() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void completionNotification(Object cookie) {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void kill() throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void getLogs(OutputStream log) throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void getSTDOut(OutputStream out) throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public void getSTDError(OutputStream error) throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class LocalPhysicalPlan implements ExecPhysicalPlan {
+    private static final long serialVersionUID = 1;
+    
+    protected OperatorKey root;
+    protected Map<OperatorKey, ExecPhysicalOperator> opTable;
+    
+    LocalPhysicalPlan(OperatorKey root,
+                      Map<OperatorKey, ExecPhysicalOperator> opTable) {
+        this.root = root;
+        this.opTable = opTable;
+    }
+    
+    public Properties getConfiguration() {
+        throw new UnsupportedOperationException();
+    }
+
+    public void updateConfiguration(Properties configuration)
+        throws ExecException {
+        throw new UnsupportedOperationException();
+    }
+             
+    public void explain(OutputStream out) {
+        POVisitor lprinter = new POPrinter(opTable, new PrintStream(out));
+        
+        ((PhysicalOperator)opTable.get(root)).visit(lprinter);
+    }
+    
+    public Map<OperatorKey, ExecPhysicalOperator> getOpTable() {
+        return opTable;
+    }
+    
+    public OperatorKey getRoot() {
+        return root;
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
+
+public class LocalResult {
+    // could record other information like when file was first
+    // created, how many times it's been re-used (in this context we
+    // are inside the same scope)
+    // ...
+    //
+    public FileSpec outFileSpec;
+    
+    public LocalResult(FileSpec outFileSpec) {
+        this.outFileSpec = outFileSpec;
+    }
+}
+

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.AmendableTuple;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+// n-ary, blocking operator. Output has schema: < group_label, { <1>, <2>, ... }, { <a>, <b>, ... } >
+public class POCogroup extends PhysicalOperator {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    List<Object[]>[] sortedInputs;
+    List<EvalSpec>       specs;
+        
+    public POCogroup(String scope, 
+                     long id,
+                     Map<OperatorKey, ExecPhysicalOperator> opTable,
+                     List<EvalSpec> specs, 
+                     int outputType) {
+        super(scope, id, opTable, outputType);
+        this.inputs = new OperatorKey[specs.size()];
+        this.specs = specs;
+    }
+
+    // drain all inputs, and sort each by group (remember, this is a blocking operator!)
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean open() throws IOException {
+        if (!super.open())
+            return false;
+
+        sortedInputs = new ArrayList[inputs.length];
+
+        for (int i = 0; i < inputs.length; i++) {
+            
+            final int finalI = i;
+            sortedInputs[i] = new ArrayList<Object[]>();
+            
+            DataCollector outputFromSpec = new DataCollector(null){
+                @Override
+                public void add(Object d) {
+                    sortedInputs[finalI].add(LOCogroup.getGroupAndTuple(d));
+                }
+            };
+            
+            DataCollector inputToSpec = specs.get(i).setupPipe(outputFromSpec);
+
+            Tuple t;            
+            while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[i])).getNext()) != null) {
+                inputToSpec.add(t);
+            }
+            inputToSpec.finishPipe();
+
+            Collections.sort(sortedInputs[i], new Comparator<Object[]>() {
+                public int compare(Object[] a, Object[] b) {
+                    return DataType.compare(a[0], b[0]);
+                }
+            });
+        }
+
+        return true;
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        
+        while (true) { // loop until we find a tuple we're allowed to output (or we hit the end)
+
+            // find the smallest group among all inputs (this is the group we should make a tuple
+            // out of)
+            Object smallestGroup = null;
+            for (int i = 0; i < inputs.length; i++) {
+                if (sortedInputs[i].size() > 0) {
+                    Object g = (sortedInputs[i].get(0))[0];
+                    if (smallestGroup == null || DataType.compare(g, smallestGroup)<0)
+                        smallestGroup = g;
+                }
+            }
+
+            if (smallestGroup == null)
+                return null; // we hit the end of the groups, so we're done
+
+            // find all tuples in each input pertaining to the group of interest, and combine the
+            // data into a single tuple
+            
+            Tuple output;
+            if (outputType == LogicalOperator.AMENDABLE) output = new AmendableTuple(1 + inputs.length, smallestGroup);
+            else output = TupleFactory.getInstance().newTuple(1 + inputs.length);
+
+            // set first field to the group tuple
+            output.set(0, smallestGroup);
+            
+            if (lineageTracer != null) lineageTracer.insert(output);
+
+            boolean done = true;
+            for (int i = 0; i < inputs.length; i++) {
+                DataBag b = BagFactory.getInstance().newDefaultBag();
+
+                while (sortedInputs[i].size() > 0) {
+                    Object g = sortedInputs[i].get(0)[0];
+
+                    Tuple t = (Tuple) sortedInputs[i].get(0)[1];
+
+                    int c = DataType.compare(g, smallestGroup);
+                    if (c < 0) {
+                        sortedInputs[i].remove(0); // discard this tuple
+                    } else if (c == 0) {
+                        b.add(t);
+                        if (lineageTracer != null) lineageTracer.union(t, output);   // update lineage
+                        sortedInputs[i].remove(0);
+                    } else {
+                        break;
+                    }
+                }
+
+                if (specs.get(i).isInner() && b.size() == 0)
+                    done = false; // this input uses "inner" semantics, and it has no tuples for
+                                    // this group, so suppress the tuple we're currently building
+
+                output.set(1 + i, b);
+            }
+
+            if (done)
+                return output;
+        }
+
+    }
+
+    public void visit(POVisitor v) {
+        v.visitCogroup(this);
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.util.DataBuffer;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+// unary, non-blocking operator.
+public class POEval extends PhysicalOperator {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    EvalSpec spec;
+    private DataBuffer buf = null;
+    private DataCollector evalPipeline = null;
+    private boolean inputDrained;
+    Tuple lastAdded = null;   // for lineage bookkeeping
+
+    public POEval(String scope, 
+                  long id, 
+                  Map<OperatorKey, ExecPhysicalOperator> opTable,
+                  OperatorKey input, 
+                  EvalSpec specIn, 
+                  int outputType) {
+        super(scope, id, opTable, outputType);
+        inputs = new OperatorKey[1];
+        inputs[0] = input;
+
+        spec = specIn;
+    }
+
+    public POEval(String scope, 
+                  long id, 
+                  Map<OperatorKey, ExecPhysicalOperator> opTable,
+                  EvalSpec specIn, 
+                  int outputType) {
+        super(scope, id, opTable, outputType);
+        inputs = new OperatorKey[1];
+        inputs[0] = null;
+
+        spec = specIn;
+    }
+
+    @Override
+    public boolean open() throws IOException {
+        if (!super.open()) return false;
+    
+        if (buf==null)
+            buf = new DataBuffer();
+        if (evalPipeline == null)
+            evalPipeline = spec.setupPipe(buf);
+            
+        inputDrained = false;
+        
+        return true;
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        while (true) {            
+            if (buf.isEmpty()){
+                // no pending outputs, so look to input to provide more food
+                
+                if (inputDrained){
+                    return null;
+                }
+                                    
+                Tuple nextTuple = ((PhysicalOperator)opTable.get(inputs[0])).getNext();
+                
+                if (nextTuple == null){
+                    inputDrained = true;
+                    evalPipeline.finishPipe();
+                }else{
+                    evalPipeline.add(nextTuple);
+                    lastAdded = nextTuple;   // for lineage bookkeeping
+                }
+            }else{
+                Tuple output = (Tuple)buf.removeFirst();
+                if (lineageTracer != null) {
+                    lineageTracer.insert(output);
+                    if (lastAdded != null) lineageTracer.union(lastAdded, output);   // update lineage (assumes one-to-many relationship between tuples added to pipeline and output!!)
+                }
+                return output;
+            }            
+        }
+    }
+       
+    public void visit(POVisitor v) {
+        v.visitEval(this);
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+
+public class POLoad extends PhysicalOperator {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    String   filename;
+    LoadFunc lf;
+    boolean bound = false;
+    PigContext pigContext;
+
+    public POLoad(String scope, 
+                  long id, 
+                  Map<OperatorKey, ExecPhysicalOperator> opTable,
+                  PigContext pigContext, 
+                  FileSpec fileSpec, 
+                  int outputType) {
+        super(scope, id, opTable, outputType);
+        inputs = new OperatorKey[0];
+
+        filename = fileSpec.getFileName();
+        try{
+            lf = (LoadFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec());
+        }catch(Exception e){
+            throw new RuntimeException(e);
+        }
+
+        this.pigContext = pigContext;
+    }
+
+    @Override
+    public boolean open() throws IOException {
+        if (!bound){
+            lf.bindTo(filename, new BufferedPositionedInputStream(FileLocalizer.open(filename, pigContext)), 0, Long.MAX_VALUE);
+            bound = true;
+        }
+        return true;
+    }
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+        bound = false;
+    }
+    
+    @Override
+    public Tuple getNext() throws IOException {
+        return lf.getNext();
+    }
+
+    @Override
+    public void visit(POVisitor v) {
+        v.visitLoad(this);
+   }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+
+public class POSort extends PhysicalOperator {
+    static final long serialVersionUID = 1L; 
+    EvalSpec sortSpec;
+    transient Iterator<Tuple> iter;
+    
+    
+    public POSort(String scope, 
+                  long id, 
+                  Map<OperatorKey, ExecPhysicalOperator> opTable, 
+                  EvalSpec sortSpec, 
+                  int outputType) {
+        super(scope, id, opTable, outputType);
+        this.sortSpec = sortSpec;
+        this.inputs = new OperatorKey[1];
+    }
+
+    @Override
+    public boolean open() throws IOException {
+        if (!super.open())
+            return false;
+        DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
+        
+        Tuple t;
+        while((t = ((PhysicalOperator)opTable.get(inputs[0])).getNext())!=null){
+            bag.add(t);
+        }
+        iter = bag.iterator();
+        return true;
+    }
+    
+    @Override
+    public Tuple getNext() throws IOException {
+        if (iter.hasNext())
+            return iter.next();
+        else
+            return null;
+    }
+
+    @Override
+    public void visit(POVisitor v) {
+        v.visitSort(this);
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class POSplit extends PhysicalOperator {
+    static final long serialVersionUID = 1L; 
+    protected ArrayList<Cond> conditions;
+    protected int readFromCond;
+
+    public POSplit(String scope, 
+                   long id, 
+                   Map<OperatorKey, ExecPhysicalOperator> opTable, 
+                   OperatorKey input, 
+                   ArrayList<Cond> conditions,
+                   int readFromCond,
+                   int outputType){
+        super(scope, id, opTable, outputType);
+        
+        this.inputs = new OperatorKey[1];
+        this.inputs[0] = input;
+        
+        this.conditions = conditions;
+        this.readFromCond = readFromCond;
+    }
+    
+    @Override
+    public boolean open() throws IOException{
+        if (!super.open()){
+            return false;
+        }
+
+        return ((PhysicalOperator)opTable.get(this.inputs[0])).open();
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        Tuple nextTuple = null;
+        
+        while ((nextTuple = ((PhysicalOperator)opTable.get(this.inputs[ 0 ])).getNext()) != null) {
+            boolean emitTuple = this.conditions.get(this.readFromCond).eval(nextTuple);
+            
+            if (emitTuple) {
+                break;
+            }
+        }
+
+        return nextTuple;
+    }
+
+    public void visit(POVisitor v) {
+        v.visitSplit(this);
+    }
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+
+public class POStore extends PhysicalOperator {
+    private static final long serialVersionUID = 1L;
+    
+    private final Log log = LogFactory.getLog(getClass());
+    
+    private PigFile f;
+    private String funcSpec;
+    boolean append = false;
+    PigContext pigContext;
+    FileSpec outFileSpec;
+    OperatorKey logicalKey;
+    Map<OperatorKey, LocalResult> materializedResults;
+
+    public POStore(String scope, 
+                   long id, 
+                   Map<OperatorKey, ExecPhysicalOperator> opTable,
+                   OperatorKey logicalKey,
+                   Map<OperatorKey, LocalResult> materializedResults,
+                   OperatorKey input, 
+                   FileSpec outputFileSpec, 
+                   boolean append, 
+                   PigContext pigContext) {
+        super(scope, id, opTable, LogicalOperator.FIXED);
+        funcSpec = outputFileSpec.getFuncSpec();
+        inputs = new OperatorKey[1];
+        inputs[0] = input;
+        System.out.println("Creating " + outputFileSpec.getFileName());
+        f = new PigFile(outputFileSpec.getFileName(), append);
+        this.append = append;
+        this.pigContext = pigContext;
+        this.outFileSpec = outputFileSpec;
+        this.logicalKey = logicalKey;
+        this.materializedResults = materializedResults;
+    }
+
+    public POStore(String scope, 
+                   long id, 
+                   Map<OperatorKey, ExecPhysicalOperator> opTable,
+                   OperatorKey logicalKey,
+                   Map<OperatorKey, LocalResult> materializedResults,
+                   FileSpec outputFileSpec, 
+                   boolean append, 
+                   PigContext pigContext) {
+        super(scope, id, opTable, LogicalOperator.FIXED);
+        inputs = new OperatorKey[1];
+        inputs[0] = null;
+        funcSpec = outputFileSpec.getFuncSpec();
+        f = new PigFile(outputFileSpec.getFileName(), append);
+        this.append = append;
+        this.pigContext = pigContext;
+        this.outFileSpec = outputFileSpec;
+        this.logicalKey = logicalKey;
+        this.materializedResults = materializedResults;
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        // get all tuples from input, and store them.
+        DataBag b = BagFactory.getInstance().newDefaultBag();
+        Tuple t;
+        while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[0])).getNext()) != null) {
+            b.add(t);
+        }
+        try {
+            StoreFunc func = (StoreFunc) PigContext.instantiateFuncFromSpec(funcSpec);
+            f.store(b, func, pigContext);
+            
+            // a result has materialized, track it!
+            LocalResult materializedResult = new LocalResult(this.outFileSpec);
+
+            materializedResults.put(logicalKey, materializedResult);
+        } catch(IOException e) {
+            throw e;
+        } catch(Exception e) {
+            IOException ne = new IOException(e.getClass().getName() + ": " + e.getMessage());
+            ne.setStackTrace(e.getStackTrace());
+            throw ne;
+        }
+
+        return null;
+    }
+    
+    @Override
+    public int getOutputType(){
+        log.error("No one should be asking my output type");
+        RuntimeException runtimeException = new RuntimeException();
+        log.error(runtimeException);
+        throw runtimeException;
+    }
+
+    @Override
+    public void visit(POVisitor v) {
+        v.visitStore(this);
+    }
+
+}

Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+
+public class POUnion extends PhysicalOperator {
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+    int currentInput;
+
+    public POUnion(String scope, 
+                   long id, 
+                   Map<OperatorKey, ExecPhysicalOperator> opTable, 
+                   OperatorKey[] inputsIn, 
+                   int outputType) {
+        super(scope, id, opTable, outputType);
+        inputs = inputsIn;
+        currentInput = 0;
+    }
+
+    public POUnion(String scope, 
+                   long id, 
+                   Map<OperatorKey, ExecPhysicalOperator> opTable, 
+                   int numInputs, 
+                   int outputType) {
+        super(scope, id, opTable, outputType);
+        inputs = new OperatorKey[numInputs];
+        for (int i = 0; i < inputs.length; i++)
+            inputs[i] = null;
+    }
+    
+    @Override
+    public boolean open() throws IOException{
+        if (!super.open()){
+            return false;
+        }
+        currentInput = 0;
+        return true;
+    }
+
+    @Override
+    public Tuple getNext() throws IOException {
+        while (currentInput < inputs.length) {
+            Tuple t = ((PhysicalOperator)opTable.get(inputs[currentInput])).getNext();
+
+            if (t == null) {
+                currentInput++;
+                continue;
+            } else {
+                Tuple output = t;
+                if (lineageTracer != null) {
+                    lineageTracer.insert(output);     // update lineage (this line is needed, to generate the correct counts)
+                    lineageTracer.union(t, output);   // (this line amounts to a no-op)
+                }
+                return output;
+            }
+        }
+
+        return null;
+    }
+
+    public void visit(POVisitor v) {
+        v.visitUnion(this);
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Feb 26 16:51:49 2008
@@ -47,12 +47,12 @@
     public BinStorage() {
     }
 
-	public Tuple getNext() throws IOException {
+    public Tuple getNext() throws IOException {
         
         byte b = 0;
 //      skip to next record
         while (true) {
-        	if (in == null || in.getPosition() >=end) {
+            if (in == null || in.getPosition() >=end) {
                 return null;
             }
             b = (byte) in.read();
@@ -75,7 +75,7 @@
         return (Tuple)DataReaderWriter.readDatum(inData);
     }
 
-	public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+    public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
         this.in = in;
         inData = new DataInputStream(in);
         this.end = end;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Feb 26 16:51:49 2008
@@ -29,6 +29,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 
 /**
  * Generates the count of the values of the first field of a tuple. This class is Algebraic in
@@ -99,7 +100,7 @@
                 // just too much.
                 sum += (Long)t.get(0);
             } catch (NumberFormatException exp) {
-                throw new IOException(exp.getClass().getName() + ":" + exp.getMessage());
+                throw WrappedIOException.wrap(exp.getClass().getName() + ":" + exp.getMessage(), exp);
             }
         }
         return sum;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Feb 26 16:51:49 2008
@@ -28,6 +28,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 
 
 /**

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Feb 26 16:51:49 2008
@@ -28,6 +28,7 @@
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
 
 
 /**

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java Tue Feb 26 16:51:49 2008
@@ -26,21 +26,21 @@
 
 public class PigDump implements StoreFunc {
 
-	public static String recordDelimiter = "\n";
-	
+    public static String recordDelimiter = "\n";
+    
 
     OutputStream os;
 
-	public void bindTo(OutputStream os) throws IOException {
-		this.os = os;
-	}
+    public void bindTo(OutputStream os) throws IOException {
+        this.os = os;
+    }
 
-	public void finish() throws IOException {
-		
-	}
+    public void finish() throws IOException {
+        
+    }
 
-	public void putNext(Tuple f) throws IOException {
-		os.write((f.toString() + recordDelimiter).getBytes());
-	}
+    public void putNext(Tuple f) throws IOException {
+        os.write((f.toString() + recordDelimiter).getBytes());
+    }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Feb 26 16:51:49 2008
@@ -17,16 +17,10 @@
  */
 package org.apache.pig.builtin;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.StoreFunc;

Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Feb 26 16:51:49 2008
@@ -20,6 +20,7 @@
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.io.IOException;
+import java.nio.charset.Charset;
 
 import org.apache.pig.LoadFunc;
 import org.apache.pig.data.Tuple;
@@ -32,14 +33,13 @@
  * contains the line of text.
  */
 public class TextLoader implements LoadFunc{
-	BufferedPositionedInputStream in;
-	private BufferedReader inData = null;
-	long                end;
+    BufferedPositionedInputStream in;
+    final private static Charset utf8 = Charset.forName("UTF8");
+    long end;
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-	public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+    public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
         this.in = in;
-        inData = new BufferedReader(new InputStreamReader(in, "UTF8"));
         this.end = end;
         // Since we are not block aligned we throw away the first
         // record and cound on a different instance to read it
@@ -47,11 +47,11 @@
             getNext();
     }
 
-	public Tuple getNext() throws IOException {
+    public Tuple getNext() throws IOException {
         if (in == null || in.getPosition() > end)
             return null;
         String line;
-        if ((line = inData.readLine()) != null) {
+        if ((line = in.readLine(utf8, (byte)'\n')) != null) {
             return mTupleFactory.newTuple(new String(line));
         }
         return null;

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Tue Feb 26 16:51:49 2008
@@ -31,7 +31,6 @@
 import org.apache.hadoop.io.WritableComparable;
 
 import org.apache.pig.impl.util.Spillable;
-import org.apache.pig.impl.mapreduceExec.PigMapReduce;
 
 /**
  * A collection of Tuples.  A DataBag may or may not fit into memory.

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Tue Feb 26 16:51:49 2008
@@ -29,7 +29,7 @@
 import java.util.ArrayList;
 
 import org.apache.pig.impl.util.Spillable;
-import org.apache.pig.impl.mapreduceExec.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 
 /**
  * A collection of Tuples.  A DataBag may or may not fit into memory.

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,17 +17,18 @@
  */
 package org.apache.pig.data;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
 import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.io.FileNotFoundException;
 
-import org.apache.pig.impl.util.PigLogger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 /**
@@ -39,6 +40,8 @@
 
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
+    private final Log log = LogFactory.getLog(getClass());
+ 
     public DefaultDataBag() {
         mContents = new ArrayList<Tuple>();
     }
@@ -67,13 +70,22 @@
         // trying to read while I'm mucking with the container.
         long spilled = 0;
         synchronized (mContents) {
+            DataOutputStream out = null;
+            try {
+                out = getSpillFile();
+            }  catch (IOException ioe) {
+                // Do not remove last file from spilled array. It was not
+                // added as File.createTmpFile threw an IOException
+                log.error(
+                    "Unable to create tmp file to spill to disk", ioe);
+                return 0;
+            }
             try {
-                DataOutputStream out = getSpillFile();
                 Iterator<Tuple> i = mContents.iterator();
                 while (i.hasNext()) {
                     i.next().write(out);
                     spilled++;
-                    // This will report progress every 16383 records.
+                    // This will spill every 16383 records.
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
@@ -81,9 +93,17 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                PigLogger.getLogger().error(
+                log.error(
                     "Unable to spill contents to disk", ioe);
                 return 0;
+            } finally {
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        log.error("Error closing spill", e);
+                    }
+                }
             }
             mContents.clear();
         }
@@ -160,7 +180,7 @@
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    PigLogger.getLogger().fatal(
+                    log.fatal(
                         "Unable to find our spill file", fnfe);
                     throw new RuntimeException(fnfe);
                 }
@@ -171,11 +191,11 @@
                     } catch (EOFException eof) {
                         // This should never happen, it means we
                         // didn't dump all of our tuples to disk.
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Ran out of tuples too soon.", eof);
-                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                        throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }
@@ -205,7 +225,7 @@
                     // Fall through to the next case where we find the
                     // next file, or go to memory
                 } catch (IOException ioe) {
-                    PigLogger.getLogger().fatal(
+                    log.fatal(
                         "Unable to read our spill file", ioe);
                     throw new RuntimeException(ioe);
                 }
@@ -234,7 +254,7 @@
             } catch (FileNotFoundException fnfe) {
                 // We can't find our own spill file?  That should never
                 // happen.
-                PigLogger.getLogger().fatal("Unable to find our spill file",
+                log.fatal("Unable to find our spill file",
                     fnfe);
                 throw new RuntimeException(fnfe);
             }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,26 +17,25 @@
  */
 package org.apache.pig.data;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.ListIterator;
-import java.util.TreeSet;
-import java.util.Arrays;
 import java.io.BufferedInputStream;
-import java.io.DataOutputStream;
 import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
 
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.util.PigLogger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 
 
@@ -51,6 +50,9 @@
  * found to be faster than storing it in a TreeSet.
  */
 public class DistinctDataBag extends DefaultAbstractBag {
+
+    private final Log log = LogFactory.getLog(getClass());
+
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
     public DistinctDataBag() {
@@ -104,8 +106,17 @@
         // trying to read while I'm mucking with the container.
         long spilled = 0;
         synchronized (mContents) {
+            DataOutputStream out = null;
+            try {
+                out = getSpillFile();
+            }  catch (IOException ioe) {
+                // Do not remove last file from spilled array. It was not
+                // added as File.createTmpFile threw an IOException
+                log.error(
+                    "Unable to create tmp file to spill to disk", ioe);
+                return 0;
+            }
             try {
-                DataOutputStream out = getSpillFile();
                 // If we've already started reading, then it will already be
                 // sorted into an array list.  If not, we need to sort it
                 // before writing.
@@ -133,9 +144,17 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                PigLogger.getLogger().error(
+                log.error(
                     "Unable to spill contents to disk", ioe);
                 return 0;
+            } finally {
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        log.error("Error closing spill", e);
+                    }
+                }
             }
             mContents.clear();
         }
@@ -241,7 +260,7 @@
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    PigLogger.getLogger().fatal(
+                    log.fatal(
                         "Unable to find our spill file", fnfe);
                     throw new RuntimeException(fnfe);
                 }
@@ -255,9 +274,9 @@
                     } catch (EOFException eof) {
                         // This should never happen, it means we
                         // didn't dump all of our tuples to disk.
-                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                        throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }
@@ -302,7 +321,7 @@
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to find out spill file.", fnfe);
                         throw new RuntimeException(fnfe);
                     }
@@ -377,7 +396,7 @@
                         mStreams.set(fileNum, null);
                         return;
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }
@@ -444,7 +463,7 @@
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
-                            PigLogger.getLogger().fatal(
+                            log.fatal(
                                 "Unable to find out spill file.", fnfe);
                             throw new RuntimeException(fnfe);
                         }
@@ -463,7 +482,7 @@
                         }
                         out.flush();
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Tue Feb 26 16:51:49 2008
@@ -26,36 +26,36 @@
  */
 public class IndexedTuple extends DefaultTuple {
 
-	public int index = -1;
-	
-	public IndexedTuple() {
-	}
-	
-	public IndexedTuple(Tuple t, int indexIn) {
+    public int index = -1;
+    
+    public IndexedTuple() {
+    }
+    
+    public IndexedTuple(Tuple t, int indexIn) {
         // Have to do it like this because Tuple is an interface, we don't
         // have access to its internal structures.
         super(t.getAll());
-		index = indexIn;
-	}
+        index = indexIn;
+    }
 
-	@Override
-	public String toString() {
-		return super.toString() + "[" + index + "]";
-	}
+    @Override
+    public String toString() {
+        return super.toString() + "[" + index + "]";
+    }
 
-	// Writable methods:
-	@Override
-	public void write(DataOutput out) throws IOException {
-		super.write(out);
+    // Writable methods:
+    @Override
+    public void write(DataOutput out) throws IOException {
+        super.write(out);
         out.writeInt(index);
-	}
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		super.readFields(in);
-		index = in.readInt();
-	}
-	
-	public Tuple toTuple(){
+    }
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        index = in.readInt();
+    }
+    
+    public Tuple toTuple(){
         return TupleFactory.getInstance().newTuple(mFields);
-	}
+    }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,13 +17,6 @@
  */
 package org.apache.pig.data;
 
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.ListIterator;
-import java.util.PriorityQueue;
-import java.util.Iterator;
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -32,16 +25,24 @@
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+  
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.util.PigLogger;
 
 
 
 /**
  * An ordered collection of Tuples (possibly) with multiples.  Data is
- * stored unsorted in an ArrayList as it comes in, and only sorted when it
- * is time to dump
+ * stored unsorted as it comes in, and only sorted when it is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
  * 
@@ -51,6 +52,8 @@
 public class SortedDataBag extends DefaultAbstractBag {
     private static TupleFactory gTupleFactory = TupleFactory.getInstance();
 
+    private final Log log = LogFactory.getLog(getClass());
+
     private Comparator<Tuple> mComp;
     private boolean mReadStarted = false;
 
@@ -104,8 +107,17 @@
         // trying to read while I'm mucking with the container.
         long spilled = 0;
         synchronized (mContents) {
+            DataOutputStream out = null;
+            try {
+                out = getSpillFile();
+            } catch (IOException ioe) {
+                // Do not remove last file from spilled array. It was not
+                // added as File.createTmpFile threw an IOException
+                log.error(
+                    "Unable to create tmp file to spill to disk", ioe);
+                return 0;
+            }
             try {
-                DataOutputStream out = getSpillFile();
                 // Have to sort the data before we can dump it.  It's bogus
                 // that we have to do this under the lock, but there's no way
                 // around it.  If the reads alread started, then we've
@@ -128,9 +140,17 @@
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
-                PigLogger.getLogger().error(
+                log.error(
                     "Unable to spill contents to disk", ioe);
                 return 0;
+            } finally {
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        log.error("Error closing spill", e);
+                    }
+                }
             }
             mContents.clear();
         }
@@ -239,7 +259,7 @@
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    PigLogger.getLogger().fatal(
+                    log.fatal(
                         "Unable to find our spill file", fnfe);
                     throw new RuntimeException(fnfe);
                 }
@@ -253,11 +273,11 @@
                     } catch (EOFException eof) {
                         // This should never happen, it means we
                         // didn't dump all of our tuples to disk.
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Ran out of tuples too soon.", eof);
-                        throw new RuntimeException("Ran out of tuples to read prematurely.");
+                        throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }
@@ -304,7 +324,7 @@
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to find our spill file", fnfe);
                         throw new RuntimeException(fnfe);
                     }
@@ -363,7 +383,7 @@
                     // this file.
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
-                    PigLogger.getLogger().fatal(
+                    log.fatal(
                         "Unable to read our spill file", ioe);
                     throw new RuntimeException(ioe);
                 }
@@ -427,7 +447,7 @@
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
-                            PigLogger.getLogger().fatal(
+                            log.fatal(
                                 "Unable to find our spill file", fnfe);
                             throw new RuntimeException(fnfe);
                         }
@@ -446,7 +466,7 @@
                         }
                         out.flush();
                     } catch (IOException ioe) {
-                        PigLogger.getLogger().fatal(
+                        log.fatal(
                             "Unable to read our spill file", ioe);
                         throw new RuntimeException(ioe);
                     }

Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Tue Feb 26 16:51:49 2008
@@ -21,48 +21,50 @@
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 public class TimestampedTuple extends DefaultTuple {
 
+    private final Log log = LogFactory.getLog(getClass());
+    static String defaultDelimiter = "[,\t]";
+
     protected double timestamp = 0;      // timestamp of this tuple
     protected boolean heartbeat = false;  // true iff this is a heartbeat (i.e. purpose is just to convey new timestamp; carries no data)
     
     public double getTimeStamp() {
-    	return timestamp;
+        return timestamp;
     }
     public void setTimeStamp(double t) {
-    	this.timestamp = t;
+        this.timestamp = t;
     }
     public boolean isHeartBeat() {
-    	return heartbeat;
+        return heartbeat;
     }
     public void setHeartBeat(boolean h) {
-    	this.heartbeat = h;
+        this.heartbeat = h;
     }
     public TimestampedTuple(int numFields) {
         super(numFields);
     }
     
-    /*
     public TimestampedTuple(String textLine, String delimiter, int timestampColumn, 
-    						SimpleDateFormat dateFormat){
+                            SimpleDateFormat dateFormat){
         if (delimiter == null) {
             delimiter = defaultDelimiter;
         }
         String[] splitString = textLine.split(delimiter, -1);
-        fields = new ArrayList<Datum>(splitString.length-1);
+        mFields = new ArrayList<Object>(splitString.length-1);
         for (int i = 0; i < splitString.length; i++) {
-        	if (i==timestampColumn){
-        		try{
-        			timestamp = dateFormat.parse(splitString[i]).getTime()/1000.0;
-        		}catch(ParseException e){
-        			System.err.println("Could not parse timestamp " + splitString[i]);
-        		}
-        	}else{
-        		fields.add(new DataAtom(splitString[i]));
-        	}
+            if (i==timestampColumn){
+                try{
+                    timestamp = dateFormat.parse(splitString[i]).getTime()/1000.0;
+                }catch(ParseException e){
+                    log.error("Could not parse timestamp " + splitString[i]);
+                }
+            }else{
+                mFields.add(new String(splitString[i]));
+            }
         }
     }
-    */
-
-    
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java Tue Feb 26 16:51:49 2008
@@ -21,5 +21,5 @@
 
 public interface FunctionInstantiator {
 
-	public Object instantiateFuncFromAlias(String alias) throws IOException;
+    public Object instantiateFuncFromAlias(String alias) throws IOException;
 }