You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/01/20 21:08:32 UTC

svn commit: r901360 [2/2] - in /hadoop/pig/branches/load-store-redesign: contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/stats/ contrib/piggybank/ja...

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/PigStorageSchema.java Wed Jan 20 20:08:28 2010
@@ -20,27 +20,13 @@
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.ExecType;
-import org.apache.pig.experimental.JsonMetadata;
-import org.apache.pig.experimental.LoadMetadata;
-import org.apache.pig.experimental.StoreMetadata;
-import org.apache.pig.experimental.ResourceSchema;
-import org.apache.pig.experimental.ResourceStatistics;
-import org.apache.pig.StoreConfig;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreMetadata;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
  *  This Load/Store Func reads/writes metafiles that allow the schema and 
@@ -54,9 +40,7 @@
  *  Due to StoreFunc limitations, you can only write the metafiles in MapReduce 
  *  mode. You can read them in Local or MapReduce mode.
  */
-public class PigStorageSchema extends PigStorage implements StoreMetadata {
-
-    private static final Log log = LogFactory.getLog(PigStorageSchema.class);
+public class PigStorageSchema extends PigStorage implements LoadMetadata, StoreMetadata {
 
     public PigStorageSchema() {
         super();
@@ -65,61 +49,50 @@
     public PigStorageSchema(String delim) {
         super(delim);
     }
+     
+    //------------------------------------------------------------------------
+    // Implementation of LoadMetaData interface
     
     @Override
-    public Schema determineSchema(String fileName, ExecType execType,
-            DataStorage storage) throws IOException {
+    public ResourceSchema getSchema(String location,
+            Configuration conf) throws IOException {
+        return (new JsonMetadata()).getSchema(location, conf);
+    }
+
+    @Override
+    public ResourceStatistics getStatistics(String location,
+            Configuration conf) throws IOException {        
+        return null;
+    }
 
-        // TODO fullPath should be retrieved ia relativeToAbsolutePath once PIG-966 is complete
-        String fullPath = FileLocalizer.fullPath(fileName, storage);
-        LoadMetadata metadataLoader = new JsonMetadata(fullPath, storage);
-        ResourceSchema resourceSchema = metadataLoader.getSchema(fullPath, null);
-        if (resourceSchema == null) {
-            return null;
-        }
-        Schema pigSchema = new Schema();
-        for (ResourceSchema.ResourceFieldSchema field : resourceSchema.getFields()) {
-            FieldSchema pigFieldSchema = DataType.determineFieldSchema(field);
-            // determineFieldSchema only sets the types. we also want the aliases.
-            // TODO this doesn't work properly for complex types
-            pigFieldSchema.alias = field.getName();
-            pigSchema.add(pigFieldSchema);
-        }
-        log.info("Loaded Schema: "+pigSchema);
-        return pigSchema;
+    @Override
+    public void setPartitionFilter(Expression partitionFilter)
+            throws IOException { 
+    }
+    
+    @Override
+    public String[] getPartitionKeys(String location, Configuration conf)
+            throws IOException {
+        return null;
     }
 
+    //------------------------------------------------------------------------
+    // Implementation of StoreMetadata
+
     @Override
-    public void finish() throws IOException {
-        super.finish();
-        JobConf jobConf = PigMapReduce.sJobConf;
-        if(jobConf != null){
-            StoreConfig storeConfig = MapRedUtil.getStoreConfig(jobConf);
-            DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(jobConf));
-            Schema schema = storeConfig.getSchema();
-            ResourceSchema resourceSchema = new ResourceSchema(schema);
-            JsonMetadata metadataWriter = new JsonMetadata(storeConfig.getLocation(), store);
-            metadataWriter.setFieldDel(fieldDel);
-            metadataWriter.setRecordDel(recordDel);
-            metadataWriter.setSchema(resourceSchema, storeConfig.getLocation(), null);
-        }
-    }
-
-    /**
-     * @see org.apache.pig.experimental.StoreMetadata#setSchema(ResourceSchema)
-     * Does not do anything in this implementation. The finish() call writes the schema.
-     */
-    @Override
-    public void setSchema(ResourceSchema schema, String location, Configuration conf) throws IOException {
-        // n\a
-    }
-
-    /**
-     * @see org.apache.pig.experimental.StoreMetadata#setStatistics(ResourceStatistics)
-     * Does not do anything in this implementation.
-     */
+    public void storeSchema(ResourceSchema schema, String location,
+            Configuration conf) throws IOException {
+        JsonMetadata metadataWriter = new JsonMetadata();
+        byte fieldDel = '\t';
+        byte recordDel = '\n';
+        metadataWriter.setFieldDel(fieldDel);
+        metadataWriter.setRecordDel(recordDel);
+        metadataWriter.storeSchema(schema, location, conf);               
+    }
+
     @Override
-    public void setStatistics(ResourceStatistics stats, String location, Configuration conf) throws IOException {
-        // n\a
+    public void storeStatistics(ResourceStatistics stats, String location,
+            Configuration conf) throws IOException {
+        
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/RegExLoader.java Wed Jan 20 20:08:28 2010
@@ -14,24 +14,23 @@
 package org.apache.pig.piggybank.storage;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.pig.ExecType;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.pig.LoadFunc;
-
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DefaultTupleFactory;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.FrontendException;
 
 /**
  * RegExLoader is an abstract class used to parse logs based on a regular expression.
@@ -42,35 +41,32 @@
  * Look to org.apache.pig.piggybank.storage.apachelog.CommonLogLoader for example usage.
  */
 
-public abstract class RegExLoader extends  Utf8StorageConverter implements LoadFunc {
-  protected BufferedPositionedInputStream in = null;
-  long end = Long.MAX_VALUE;
-  private byte recordDel = (byte) '\n';
-  private String fieldDel = "\t";
-  final private static Charset utf8 = Charset.forName("UTF8");
-  OutputStream os;
-
+public abstract class RegExLoader extends LoadFunc {
+  private LineRecordReader in = null;
+  
   abstract public Pattern getPattern();
 
-  public RegExLoader() {
-  }
-
+  @Override
   public Tuple getNext() throws IOException {
-    if (in == null || in.getPosition() > end) {
+    if (!in.nextKeyValue()) {
       return null;
     }
+    
     Pattern pattern = getPattern();
     Matcher matcher = pattern.matcher("");
     TupleFactory mTupleFactory = DefaultTupleFactory.getInstance();
     String line;
+    
     boolean tryNext = true;
     while (tryNext) {
-      if ((line = in.readLine(utf8, recordDel)) == null) {
+      Text val = in.getCurrentValue();
+      if (val == null) {
         break;
       }
-      if (line.length() > 0 && line.charAt(line.length() - 1) == '\r')
+      line = val.toString();
+      if (line.length() > 0 && line.charAt(line.length() - 1) == '\r') {
         line = line.substring(0, line.length() - 1);
-
+      }
       matcher = matcher.reset(line);
       ArrayList<DataByteArray> list = new ArrayList<DataByteArray>();
       if (matcher.find()) {
@@ -82,39 +78,25 @@
       }
     }
 
-
     return null;
   }
-
-  public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
-    this.in = in;
-    this.end = end;
-
-    // Since we are not block aligned we throw away the first
-    // record and could on a different instance to read it
-    if (offset != 0) {
-      getNext();
-    }
-  }
-
-  public void bindTo(OutputStream os) throws IOException {
-    this.os = os;
-  }
-
-  public void putNext(Tuple f) throws IOException {
-    os.write((f.toDelimitedString(this.fieldDel) + (char) this.recordDel).getBytes("utf8"));
-  }
-
-  public void finish() throws IOException {
+  
+  @SuppressWarnings("unchecked")
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+      return new TextInputFormat();
   }
 
-  public Schema determineSchema(String fileName, ExecType execType, DataStorage storage) throws IOException {
-    return null;
+  @SuppressWarnings("unchecked")
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit split)
+          throws IOException {
+      in = (LineRecordReader) reader;
   }
 
   @Override
-  public LoadFunc.RequiredFieldResponse fieldsToRead(LoadFunc.RequiredFieldList requiredFieldList) throws FrontendException {
-      return new LoadFunc.RequiredFieldResponse(false);
+  public void setLocation(String location, Job job) throws IOException {
+      FileInputFormat.setInputPaths(job, location);      
   }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/SequenceFileLoader.java Wed Jan 20 20:08:28 2010
@@ -20,40 +20,31 @@
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-
-import org.apache.pig.ExecType;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.SamplableLoader;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
+import org.apache.pig.FileInputLoadFunc;
 import org.apache.pig.backend.BackendException;
-import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.data.DataBag;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
  * A Loader for Hadoop-Standard SequenceFiles.
@@ -61,10 +52,10 @@
  * Text, IntWritable, LongWritable, FloatWritable, DoubleWritable, BooleanWritable, ByteWritable
  **/
 
-public class SequenceFileLoader implements LoadFunc, SamplableLoader {
+public class SequenceFileLoader extends FileInputLoadFunc {
   
-  private SequenceFile.Reader reader;
-  private long end;
+  private SequenceFileRecordReader<Writable, Writable> reader;
+ 
   private Writable key;
   private Writable value;
   private ArrayList<Object> mProtoTuple = null;
@@ -73,74 +64,29 @@
   protected TupleFactory mTupleFactory = TupleFactory.getInstance();
   protected SerializationFactory serializationFactory;
 
-  protected byte keyType;
-  protected byte valType;
+  protected byte keyType = DataType.UNKNOWN;
+  protected byte valType = DataType.UNKNOWN;
     
   public SequenceFileLoader() {
-  
+    mProtoTuple = new ArrayList<Object>(2);
   }
-  
-  @Override
-  public void bindTo(String fileName, BufferedPositionedInputStream is,
-      long offset, long end) throws IOException {
-    
-    inferReader(fileName);
-    if (offset != 0)
-      reader.sync(offset);
-
-    this.end = end;
-    
-    try {
-      this.key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), PigMapReduce.sJobConf);
-      this.value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), PigMapReduce.sJobConf);
-    } catch (ClassCastException e) {
-      throw new RuntimeException("SequenceFile contains non-Writable objects", e);
-    }
-    setKeyValueTypes(key.getClass(), value.getClass());
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public Schema determineSchema(String fileName, ExecType execType,
-      DataStorage storage) throws IOException {
-    inferReader(fileName);
-    Class<Writable> keyClass = null;
-    Class<Writable> valClass= null;
-    try {
-      keyClass = (Class<Writable>) reader.getKeyClass();
-      valClass = (Class<Writable>) reader.getValueClass();
-    } catch (ClassCastException e) {
-      throw new RuntimeException("SequenceFile contains non-Writable objects", e);
-    }
-    Schema schema = new Schema();
-    setKeyValueTypes(keyClass, valClass);  
-    schema.add(new Schema.FieldSchema(null, keyType));
-    schema.add(new Schema.FieldSchema(null, valType));
-    return schema;
-  }
-
-  protected void setKeyValueTypes(Class<?> keyClass, Class<?> valueClass) throws BackendException {
+ 
+  protected void setKeyType(Class<?> keyClass) throws BackendException {
     this.keyType |= inferPigDataType(keyClass);
-    this.valType |= inferPigDataType(valueClass);
     if (keyType == DataType.ERROR) { 
       LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
       throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
     } 
-    if (valType == DataType.ERROR) {
-      LOG.warn("Unable to translate value "+value.getClass()+" to a Pig datatype");
-      throw new BackendException("Unable to translate "+value.getClass()+" to a Pig datatype");
-    }
-
-  }
-  protected void inferReader(String fileName) throws IOException {
-    if (reader == null) {
-      Configuration conf = new Configuration();
-      Path path = new Path(fileName);
-      FileSystem fs = FileSystem.get(path.toUri(), conf);
-      reader = new SequenceFile.Reader(fs, path, conf);
-    }
   }
   
+  protected void setValueType(Class<?> valueClass) throws BackendException {
+    this.valType |= inferPigDataType(valueClass);
+    if (keyType == DataType.ERROR) { 
+      LOG.warn("Unable to translate key "+key.getClass()+" to a Pig datatype");
+      throw new BackendException("Unable to translate "+key.getClass()+" to a Pig datatype");
+    } 
+  }
+    
   protected byte inferPigDataType(Type t) {
     if (t == DataByteArray.class) return DataType.BYTEARRAY;
     else if (t == Text.class) return DataType.CHARARRAY;
@@ -169,77 +115,48 @@
   }
   
   @Override
-  public LoadFunc.RequiredFieldResponse fieldsToRead(LoadFunc.RequiredFieldList requiredFieldList) throws FrontendException {
-      return new LoadFunc.RequiredFieldResponse(false);
-  }
-
-  @Override
   public Tuple getNext() throws IOException {
-    if (mProtoTuple == null) mProtoTuple = new ArrayList<Object>(2);
-    if (reader != null && (reader.getPosition() < end || !reader.syncSeen()) && reader.next(key, value)) {
-      mProtoTuple.add(translateWritableToPigDataType(key, keyType));
-      mProtoTuple.add(translateWritableToPigDataType(value, valType));
-      Tuple t =  mTupleFactory.newTuple(mProtoTuple);
-      mProtoTuple.clear();
-      return t;
+    boolean next = false;
+    try {
+      next = reader.nextKeyValue();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
     }
-    return null;
-  }
-
-  @Override
-  public long getPosition() throws IOException {
-    return reader.getPosition();
-  }
-
-  @Override
-  public Tuple getSampledTuple() throws IOException {
-    return this.getNext();
-  }
-
-  @Override
-  public long skip(long n) throws IOException {
-    long startPos = reader.getPosition();
-    reader.sync(startPos+n);
-    return reader.getPosition()-startPos;
-  }
-
-  @Override
-  public DataBag bytesToBag(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
-  }
-
-  @Override
-  public String bytesToCharArray(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
-  }
-
-  @Override
-  public Double bytesToDouble(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
-  }
-
-  @Override
-  public Float bytesToFloat(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
-  }
-
-  @Override
-  public Integer bytesToInteger(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
+    
+    if (!next) return null;
+    
+    key = reader.getCurrentKey();
+    value = reader.getCurrentValue();
+    
+    if (keyType == DataType.UNKNOWN && key != null) {
+        setKeyType(key.getClass());
+    }
+    if (valType == DataType.UNKNOWN && value != null) {
+        setValueType(value.getClass());
+    }
+    
+    mProtoTuple.add(translateWritableToPigDataType(key, keyType));
+    mProtoTuple.add(translateWritableToPigDataType(value, valType));
+    Tuple t =  mTupleFactory.newTuple(mProtoTuple);
+    mProtoTuple.clear();
+    return t;
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Long bytesToLong(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
+  public InputFormat getInputFormat() throws IOException {
+    return new SequenceFileInputFormat<Writable, Writable>();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
-  public Map<String, Object> bytesToMap(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
+  public void prepareToRead(RecordReader reader, PigSplit split)
+        throws IOException {
+    this.reader = (SequenceFileRecordReader) reader;
   }
 
   @Override
-  public Tuple bytesToTuple(byte[] b) throws IOException {
-    throw new FrontendException("SequenceFileLoader does not expect to cast data.");
+  public void setLocation(String location, Job job) throws IOException {
+    FileInputFormat.setInputPaths(job, location);    
   }
 }

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCombinedLogLoader.java Wed Jan 20 20:08:28 2010
@@ -13,21 +13,15 @@
 
 package org.apache.pig.piggybank.test.storage;
 
-import static org.apache.pig.ExecType.LOCAL;
-
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
 
-import org.apache.pig.PigServer;
-
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.piggybank.storage.apachelog.CombinedLogLoader;
 import org.junit.Test;
 
@@ -84,27 +78,6 @@
     }
 
     @Test
-    public void testLoadFromBindTo() throws Exception {
-        String filename = TestHelper.createTempFile(data, " ");
-        CombinedLogLoader combinedLogLoader = new CombinedLogLoader();
-        PigServer pigServer = new PigServer(LOCAL);
-        InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext());
-        combinedLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
-
-        int tupleCount = 0;
-
-        while (true) {
-            Tuple tuple = combinedLogLoader.getNext();
-            if (tuple == null)
-                break;
-            else {
-                TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
-                tupleCount++;
-            }
-        }
-        assertEquals(data.size(), tupleCount);
-    }
-
     public void testLoadFromPigServer() throws Exception {
         String filename = TestHelper.createTempFile(data, " ");
         PigServer pig = new PigServer(ExecType.LOCAL);

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCommonLogLoader.java Wed Jan 20 20:08:28 2010
@@ -13,21 +13,15 @@
 
 package org.apache.pig.piggybank.test.storage;
 
-import static org.apache.pig.ExecType.LOCAL;
-
-import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 
 import junit.framework.TestCase;
 
-import org.apache.pig.PigServer;
-
 import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.piggybank.storage.apachelog.CommonLogLoader;
 import org.junit.Test;
 
@@ -75,28 +69,6 @@
     }
 
     @Test
-    public void testLoadFromBindTo() throws Exception {
-        String filename = TestHelper.createTempFile(data, " ");
-        CommonLogLoader commonLogLoader = new CommonLogLoader();
-        PigServer pigServer = new PigServer(LOCAL);
-        
-        InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext());
-        commonLogLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
-
-        int tupleCount = 0;
-
-        while (true) {
-            Tuple tuple = commonLogLoader.getNext();
-            if (tuple == null)
-                break;
-            else {
-                TestHelper.examineTuple(EXPECTED, tuple, tupleCount);
-                tupleCount++;
-            }
-        }
-        assertEquals(data.size(), tupleCount);
-    }
-
     public void testLoadFromPigServer() throws Exception {
         String filename = TestHelper.createTempFile(data, " ");
         PigServer pig = new PigServer(ExecType.LOCAL);

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java Wed Jan 20 20:08:28 2010
@@ -150,7 +150,7 @@
         .getLocal(new Configuration()) : cluster.getFileSystem());
     Path output = new Path(outPath);
     Assert.assertTrue("Output dir does not exists!", fs.exists(output)
-        && fs.isDirectory(output));
+        && fs.getFileStatus(output).isDir());
 
     Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
     Assert.assertTrue("Split field dirs not found!", paths != null);
@@ -161,19 +161,26 @@
       Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
           files != null);
       for (Path filePath : files) {
-        if (fs.isFile(filePath)) {
-          BufferedReader reader = new BufferedReader(new InputStreamReader(fs
-              .open(filePath)));
-          String line = "";
-          while ((line = reader.readLine()) != null) {
-            String[] fields = line.split("\\t");
-            Assert.assertEquals(fields.length, 3);
-            Assert.assertEquals("Unexpected field value in the output record",
+        Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
+        
+        BufferedReader reader = new BufferedReader(new InputStreamReader(fs
+                .open(filePath)));
+        String line = "";
+        int count = 0;
+        while ((line = reader.readLine()) != null) {
+          String[] fields = line.split("\\t");
+          Assert.assertEquals(fields.length, 3);
+          Assert.assertEquals("Unexpected field value in the output record",
                 splitField, fields[1]);
-          }
-          reader.close();
-        }
+          count++;
+          System.out.println("field: " + fields[1]);
+        }        
+        reader.close();
+        Assert.assertEquals(count, 3);
       }
     }
   }
 }
+
+
+

Modified: hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java (original)
+++ hadoop/pig/branches/load-store-redesign/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestRegExLoader.java Wed Jan 20 20:08:28 2010
@@ -15,8 +15,8 @@
 
 import static org.apache.pig.ExecType.LOCAL;
 
-import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
@@ -24,21 +24,21 @@
 import org.apache.pig.PigServer;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.piggybank.storage.RegExLoader;
+import org.apache.pig.test.Util;
 import org.junit.Test;
 
 public class TestRegExLoader extends TestCase {
     private static String patternString = "(\\w+),(\\w+);(\\w+)";
     private final static Pattern pattern = Pattern.compile(patternString);
 
-    class DummyRegExLoader extends RegExLoader {
+    public static class DummyRegExLoader extends RegExLoader {
+        public DummyRegExLoader() {}
+        
         @Override
         public Pattern getPattern() {
             return Pattern.compile(patternString);
         }
-
     }
 
     public static ArrayList<String[]> data = new ArrayList<String[]>();
@@ -49,32 +49,26 @@
     }
 
     @Test
-    public void testLoadFromBindTo() throws Exception {
-        //String filename = TestHelper.createTempFile(data, " ");
-        //System.err.println(filename);
-        DummyRegExLoader dummyRegExLoader = new DummyRegExLoader();
+    public void testLoadFromBindTo() throws Exception {       
         PigServer pigServer = new PigServer(LOCAL);
         
         String filename = TestHelper.createTempFile(data, "");
-        /*org.apache.pig.test.Util.createInputFile("tmp", "", 
-            new String[]{"1,one;i", "2,two;ii", "3,three;iii"}
-        );
-        
-        String filename = input.getAbsolutePath();
-        */
-        InputStream inputStream = FileLocalizer.open(filename, pigServer.getPigContext());
-        dummyRegExLoader.bindTo(filename, new BufferedPositionedInputStream(inputStream), 0, Long.MAX_VALUE);
         ArrayList<DataByteArray[]> expected = TestHelper.getExpected(data, pattern);
+        
+        pigServer.registerQuery("A = LOAD 'file:" + Util.encodeEscape(filename) + 
+                "' USING " + DummyRegExLoader.class.getName() + "() AS (key, val);");
+        Iterator<?> it = pigServer.openIterator("A");
         int tupleCount = 0;
-        while (true) {
-            Tuple tuple = dummyRegExLoader.getNext();
+        while (it.hasNext()) {
+            Tuple tuple = (Tuple) it.next();
             if (tuple == null)
-                break;
+              break;
             else {
-                TestHelper.examineTuple(expected, tuple, tupleCount);
-                tupleCount++;
+              TestHelper.examineTuple(expected, tuple, tupleCount);
+              tupleCount++;
             }
-        }
+          }
         assertEquals(data.size(), tupleCount);
     }
+        
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/LoadMetadata.java Wed Jan 20 20:08:28 2010
@@ -20,7 +20,6 @@
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.pig.impl.plan.OperatorPlan;
 
 /**
  * This interface defines how to retrieve metadata related to data to be loaded.

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceStatistics.java Wed Jan 20 20:08:28 2010
@@ -17,28 +17,241 @@
  */
 package org.apache.pig;
 
-public class ResourceStatistics {
+import java.io.Serializable;
+import java.util.Arrays;
 
-    public static class ResourceFieldStatistics {
+public class ResourceStatistics implements Cloneable {
 
-        int version;
+    /* Getters intentionally return mutable arrays instead of copies,
+     * to simplify updates without unnecessary copying.
+     * Setters make a copy of the arrays in order to prevent an array
+     * from being shared by two objects, with modifications in one
+     * accidentally changing the other.
+     */
+    
+    // arrays are initialized to empty so we don't have to worry about NPEs
+    // setters disallow setting them to null.
+    
+    private static final long serialVersionUID = 1L;
+    public Long mBytes; // size in megabytes
+    public Long numRecords;  // number of records
+    public Long avgRecordSize;
+    public ResourceFieldStatistics[] fields = new ResourceFieldStatistics[0];
 
-        enum Distribution {UNIFORM, NORMAL, POWER};
+    public static class ResourceFieldStatistics implements Serializable {
 
-        public long numDistinctValues;  // number of distinct values represented in this field
-        public Distribution distribution; // how values in this field are distributed
+        public static final long serialVersionUID = 1L;
+
+        public int version;
+
+        public Long numDistinctValues;  // number of distinct values represented in this field
 
         // We need some way to represent a histogram of values in the field,
         // as those will be useful.  However, we can't count on being
         // able to hold such histograms in memory.  Have to figure out
         // how they can be kept on disk and represented here.
 
-        // Probably more in here
+        // for now.. don't create so many buckets you can't hold them in memory
+        
+        // an ordered array of the most common values, 
+        // in descending order of frequency
+        public Object[] mostCommonValues = new Object[0];
+        
+        // an array that matches the mostCommonValues array, and lists
+        // the frequencies of those values as a fraction (0 through 1) of
+        // the total number of records
+        public float[] mostCommonValuesFreq = new float[0];
+        
+        // an ordered array of values, from min val to max val
+        // such that the number of records with values 
+        // between valueHistogram[i] and and valueHistogram[i+1] is
+        // roughly equal for all values of i.
+        // NOTE: if mostCommonValues is non-empty, the values in that array
+        // should not be included in the histogram. Adjust accordingly.
+        public Object[] valueHistogram = new Object[0];
+
+        
+        public int getVersion() {
+            return version;
+        }
+
+        public ResourceFieldStatistics setVersion(int version) {
+            this.version = version;  
+            return this;
+        }
+
+        public Long getNumDistinctValues() {
+            return numDistinctValues;
+        }
+
+        public ResourceFieldStatistics setNumDistinctValues(Long numDistinctValues) {
+            this.numDistinctValues = numDistinctValues; 
+            return this;
+        }
+
+        public Object[] getMostCommonValues() {
+            return mostCommonValues;
+        }
+
+        public ResourceFieldStatistics  setMostCommonValues(Object[] mostCommonValues) {
+            if (mostCommonValues !=null)
+                this.mostCommonValues = 
+                    Arrays.copyOf(mostCommonValues, mostCommonValues.length);
+            return this;
+        }
+
+        public float[] getMostCommonValuesFreq() {
+            return mostCommonValuesFreq;
+        }
+
+        public ResourceFieldStatistics setMostCommonValuesFreq(float[] mostCommonValuesFreq) {
+            if (mostCommonValuesFreq != null) 
+                this.mostCommonValuesFreq = 
+                    Arrays.copyOf(mostCommonValuesFreq, mostCommonValuesFreq.length);
+            return this;
+        }
+
+        public Object[] getValueHistogram() {
+            return valueHistogram;
+        }
+
+        public ResourceFieldStatistics  setValueHistogram(Object[] valueHistogram) {
+            if (valueHistogram != null) 
+                this.valueHistogram = Arrays.copyOf(valueHistogram, valueHistogram.length);
+            return this;
+        }
+
+        
+        /*
+         * equals() and hashCode() overridden mostly for ease of testing
+         * you shouldn't encounter a situation in which you need to .equals()
+         * two sets of statistics on different objects "in the wild"
+         */
+        @Override
+        public boolean equals(Object anOther) {
+            if (anOther == null || !(anOther.getClass().equals(this.getClass())))
+                return false;
+            ResourceFieldStatistics other = (ResourceFieldStatistics) anOther;
+            // setters do not allow null values, so no worries about NPEs here
+            return (Arrays.equals(mostCommonValues, other.mostCommonValues) &&
+                    Arrays.equals(mostCommonValuesFreq, other.mostCommonValuesFreq) &&
+                    Arrays.equals(valueHistogram, other.valueHistogram) &&
+                    this.numDistinctValues.equals(other.numDistinctValues) &&
+                    this.version == other.version
+                    );
+        }
+        
+        /**
+         * A naive hashCode implementation following the example in IBM's developerworks:
+         * http://www.ibm.com/developerworks/java/library/j-jtp05273.html
+         */
+        @Override
+        public int hashCode() {
+            int hash = 1;
+            hash = 31 * hash +  Arrays.hashCode(mostCommonValues);
+            hash = 31 * hash + Arrays.hashCode(mostCommonValuesFreq);
+            hash = 31 * hash + numDistinctValues.hashCode();
+            hash = 31 * hash + Arrays.hashCode(valueHistogram);
+            hash = 31 * hash + version;
+            return 0;
+        }
+        
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder("ResourceStatistics. Version: "+version+"\n");
+            sb.append("MCV:\n");
+            for (Object o : mostCommonValues) sb.append('['+ o.toString() +']');
+            sb.append("\n MCVfreq:\n");
+            for (Float f : mostCommonValuesFreq) sb.append('['+f.toString()+']');
+            sb.append("\n");
+            sb.append("numDistVals: "+numDistinctValues);
+            sb.append("valHistogram: \n");
+            for (Object o : valueHistogram) sb.append('['+o.toString()+']');
+            sb.append("\n");
+            return sb.toString();
+        }
+    }
+
+    
+    public Long getmBytes() {
+        return mBytes;
+    }
+    public ResourceStatistics setmBytes(Long mBytes) {
+        this.mBytes = mBytes;
+        return this;
+    }
+    public Long getNumRecords() {
+        return numRecords;
+    }
+    public ResourceStatistics setNumRecords(Long numRecords) {
+        this.numRecords = numRecords;
+        return this;
+    }
+    
+    /* 
+     * returns average record size. This number can be explicitly specified by statistics, or
+     * if absent, computed using totalbytes/totalrecords. Will return null if can't be computed.
+     */
+    public Long getAvgRecordSize() {
+        if (avgRecordSize == null && (mBytes != null && numRecords != null))
+            return mBytes / numRecords;
+        else 
+            return avgRecordSize;
+    }
+    
+    public void setAvgRecordSize(Long size) {
+        avgRecordSize = size;
+    }
+    
+    public ResourceFieldStatistics[] getFields() {
+        return fields;
+    }
+    
+    public ResourceStatistics setFields(ResourceFieldStatistics[] fields) {
+        if (fields != null) 
+            this.fields = Arrays.copyOf(fields, fields.length);
+        return this;
     }
 
-    public long mBytes; // size in megabytes
-    public long numRecords;  // number of records
-    public ResourceFieldStatistics[] fields;
 
+    /*
+     * equals() and hashCode() overridden mostly for ease of testing
+     * you shouldn't encounter a situation in which you need to .equals()
+     * two sets of statistics on different objects "in the wild"
+     */
+    @Override
+    public boolean equals(Object anOther) {
+        if (anOther == null || !(anOther.getClass().equals(this.getClass())))
+            return false;        
+        ResourceStatistics other = (ResourceStatistics) anOther;
+        return (Arrays.equals(fields, other.fields) &&
+                ((mBytes==null) 
+                        ? (other.mBytes==null) : mBytes.equals(other.mBytes)) &&
+                ((numRecords == null) 
+                        ? (other.numRecords==null) : numRecords.equals(other.numRecords)) 
+        );
+    }
+    
+    @Override
+    public int hashCode() {
+        int hash = 1;
+        hash = 31*hash + Arrays.hashCode(fields);
+        hash = 31*hash + (mBytes == null ? 0 : mBytes.hashCode());
+        hash = 31*hash + (numRecords == null ? 0 : numRecords.hashCode());
+        return hash;
+    }
     // Probably more in here
+    
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("Field Stats: \n");
+        for (ResourceFieldStatistics f : fields) sb.append(f.toString());
+        sb.append("mBytes: "+mBytes);
+        sb.append("numRecords: "+numRecords);
+        return sb.toString();
+    }
+    
+    public Object clone() throws CloneNotSupportedException {
+        return super.clone();
+      }
 }
\ No newline at end of file

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/StoreFunc.java Wed Jan 20 20:08:28 2010
@@ -15,8 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig;
-
+package org.apache.pig;
+
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
@@ -24,14 +24,15 @@
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.pig.data.Tuple;
-
-
-/**
-* This interface is used to implement functions to write records
-* from a dataset.
-* 
-*
-*/
+import org.apache.pig.impl.util.UDFContext;
+
+
+/**
+* This interface is used to implement functions to write records
+* from a dataset.
+* 
+*
+*/
 
 public interface StoreFunc {
 
@@ -118,4 +119,4 @@
      * @param signature a unique signature to identify this StoreFunc
      */
     public void setStoreFuncUDFContextSignature(String signature);
-}
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Wed Jan 20 20:08:28 2010
@@ -22,13 +22,18 @@
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 
@@ -65,7 +70,6 @@
      */
     public PigOutputCommitter(TaskAttemptContext context)
             throws IOException {
-        
         // create and store the map and reduce output committers
         mapOutputCommitters = getCommitters(context, 
                 JobControlCompiler.PIG_MAP_STORES);
@@ -148,21 +152,36 @@
         return contextCopy;   
     }
 
+    private void storeCleanup(POStore store, Configuration conf)
+            throws IOException {
+        StoreFunc storeFunc = store.getStoreFunc();
+        if (storeFunc instanceof StoreMetadata) {
+            Schema schema = store.getSchema();
+            if (schema != null) {
+                ((StoreMetadata) storeFunc).storeSchema(
+                        new ResourceSchema(schema), store.getSFile()
+                                .getFileName(), conf);
+            }
+        }
+    }
+    
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapred.FileOutputCommitter#cleanupJob(org.apache.hadoop.mapred.JobContext)
      */
     @Override
     public void cleanupJob(JobContext context) throws IOException {
         // call clean up on all map and reduce committers
-        for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
+        for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {            
             JobContext updatedContext = setUpContext(context, 
                     mapCommitter.second);
+            storeCleanup(mapCommitter.second, context.getConfiguration());
             mapCommitter.first.cleanupJob(updatedContext);
         }
         for (Pair<OutputCommitter, POStore> reduceCommitter : 
-            reduceOutputCommitters) {
+            reduceOutputCommitters) {            
             JobContext updatedContext = setUpContext(context, 
                     reduceCommitter.second);
+            storeCleanup(reduceCommitter.second, context.getConfiguration());
             reduceCommitter.first.cleanupJob(updatedContext);
         }
        
@@ -172,7 +191,7 @@
      * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
     @Override
-    public void abortTask(TaskAttemptContext context) throws IOException {
+    public void abortTask(TaskAttemptContext context) throws IOException {        
         if(context.getTaskAttemptID().isMap()) {
             for (Pair<OutputCommitter, POStore> mapCommitter : 
                 mapOutputCommitters) {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/builtin/BinStorage.java Wed Jan 20 20:08:28 2010
@@ -371,7 +371,7 @@
 
     @Override
     public void checkSchema(ResourceSchema s) throws IOException {
-        throw new UnsupportedOperationException();
+        
     }
 
     @Override

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java?rev=901360&r1=901359&r2=901360&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DataType.java Wed Jan 20 20:08:28 2010
@@ -28,7 +28,7 @@
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.pig.PigException;
-import org.apache.pig.experimental.ResourceSchema;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;