You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2011/12/23 18:53:13 UTC

svn commit: r1222766 [2/3] - in /incubator/accumulo/trunk: src/core/src/main/java/org/apache/accumulo/core/ src/core/src/main/java/org/apache/accumulo/core/client/impl/thrift/ src/core/src/main/java/org/apache/accumulo/core/file/ src/core/src/main/java...

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/file/map/MySequenceFile.java Fri Dec 23 17:53:12 2011
@@ -18,20 +18,14 @@
 
 package org.apache.accumulo.core.file.map;
 
-import java.io.BufferedOutputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
-import java.rmi.server.UID;
-import java.security.MessageDigest;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -40,40 +34,23 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.VersionMismatchException;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableName;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.zlib.ZlibFactory;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
-import org.apache.hadoop.io.serializer.Serializer;
-import org.apache.hadoop.util.MergeSort;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.hadoop.util.PriorityQueue;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -255,344 +232,6 @@ public class MySequenceFile {
     job.set("io.seqfile.compression.type", val.toString());
   }
   
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass, getCompressionType(conf));
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType)
-      throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
-        fs.getDefaultBlockSize(), compressionType, new DefaultCodec(), null, new Metadata());
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param progress
-   *          The Progressable object to track progress.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
-      Progressable progress) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
-        fs.getDefaultBlockSize(), compressionType, new DefaultCodec(), progress, new Metadata());
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
-        fs.getDefaultBlockSize(), compressionType, codec, null, new Metadata());
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @param progress
-   *          The Progressable object to track progress.
-   * @param metadata
-   *          The metadata of the file.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
-    return createWriter(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(),
-        fs.getDefaultBlockSize(), compressionType, codec, progress, metadata);
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param bufferSize
-   *          buffer size for the underlaying outputstream.
-   * @param replication
-   *          replication factor for the file.
-   * @param blockSize
-   *          block size for the file.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @param progress
-   *          The Progressable object to track progress.
-   * @param metadata
-   *          The metadata of the file.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication,
-      long blockSize, CompressionType compressionType, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
-    if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
-    }
-    
-    Writer writer = null;
-    
-    if (compressionType == CompressionType.NONE) {
-      writer = new Writer(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, progress, metadata);
-    } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, codec, progress, metadata);
-    } else if (compressionType == CompressionType.BLOCK) {
-      writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass, bufferSize, replication, blockSize, codec, progress, metadata);
-    }
-    
-    return writer;
-  }
-  
-  /**
-   * Construct the preferred type of MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param name
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @param progress
-   *          The Progressable object to track progress.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec, Progressable progress) throws IOException {
-    Writer writer = createWriter(fs, conf, name, keyClass, valClass, compressionType, codec, progress, new Metadata());
-    return writer;
-  }
-  
-  /**
-   * Construct the preferred type of 'raw' MySequenceFile Writer.
-   * 
-   * @param out
-   *          The stream on top which the writer is to be constructed.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compress
-   *          Compress data?
-   * @param blockCompress
-   *          Compress blocks?
-   * @param metadata
-   *          The metadata of the file.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   * @throws IOException
-   */
-  private static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec, Metadata metadata) throws IOException {
-    if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
-    }
-    
-    Writer writer = null;
-    
-    if (!compress) {
-      writer = new Writer(conf, out, keyClass, valClass, metadata);
-    } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    } else {
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    }
-    
-    return writer;
-  }
-  
-  /**
-   * Construct the preferred type of 'raw' MySequenceFile Writer.
-   * 
-   * @param fs
-   *          The configured filesystem.
-   * @param conf
-   *          The configuration.
-   * @param file
-   *          The name of the file.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compress
-   *          Compress data?
-   * @param blockCompress
-   *          Compress blocks?
-   * @param codec
-   *          The compression codec.
-   * @param progress
-   * @param metadata
-   *          The metadata of the file.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   * @throws IOException
-   */
-  private static Writer createWriter(FileSystem fs, Configuration conf, Path file, Class keyClass, Class valClass, boolean compress, boolean blockCompress,
-      CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
-    if (codec != null && (codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
-    }
-    
-    Writer writer = null;
-    
-    if (!compress) {
-      writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
-    } else if (compress && !blockCompress) {
-      writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, codec, progress, metadata);
-    } else {
-      writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, codec, progress, metadata);
-    }
-    
-    return writer;
-  }
-  
-  /**
-   * Construct the preferred type of 'raw' MySequenceFile Writer.
-   * 
-   * @param conf
-   *          The configuration.
-   * @param out
-   *          The stream on top which the writer is to be constructed.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @param metadata
-   *          The metadata of the file.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec, Metadata metadata) throws IOException {
-    if ((codec instanceof GzipCodec) && !NativeCodeLoader.isNativeCodeLoaded() && !ZlibFactory.isNativeZlibLoaded(conf)) {
-      throw new IllegalArgumentException("MySequenceFile doesn't work with " + "GzipCodec without native-hadoop code!");
-    }
-    
-    Writer writer = null;
-    
-    if (compressionType == CompressionType.NONE) {
-      writer = new Writer(conf, out, keyClass, valClass, metadata);
-    } else if (compressionType == CompressionType.RECORD) {
-      writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    } else if (compressionType == CompressionType.BLOCK) {
-      writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
-    }
-    
-    return writer;
-  }
-  
-  /**
-   * Construct the preferred type of 'raw' MySequenceFile Writer.
-   * 
-   * @param conf
-   *          The configuration.
-   * @param out
-   *          The stream on top which the writer is to be constructed.
-   * @param keyClass
-   *          The 'key' type.
-   * @param valClass
-   *          The 'value' type.
-   * @param compressionType
-   *          The compression type.
-   * @param codec
-   *          The compression codec.
-   * @return Returns the handle to the constructed MySequenceFile Writer.
-   */
-  public static Writer createWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionType compressionType,
-      CompressionCodec codec) throws IOException {
-    Writer writer = createWriter(conf, out, keyClass, valClass, compressionType, codec, new Metadata());
-    return writer;
-  }
-  
   /** The interface to 'raw' values of SequenceFiles. */
   public static interface ValueBytes {
     
@@ -801,534 +440,6 @@ public class MySequenceFile {
     }
   }
   
-  /** Write key/value pairs to a sequence-format file. */
-  public static class Writer implements java.io.Closeable {
-    Configuration conf;
-    FSDataOutputStream out;
-    boolean ownOutputStream = true;
-    DataOutputBuffer buffer = new DataOutputBuffer();
-    
-    Class keyClass;
-    Class valClass;
-    
-    private boolean compress;
-    CompressionCodec codec = null;
-    CompressionOutputStream deflateFilter = null;
-    DataOutputStream deflateOut = null;
-    Metadata metadata = null;
-    Compressor compressor = null;
-    
-    protected Serializer keySerializer;
-    protected Serializer uncompressedValSerializer;
-    protected Serializer compressedValSerializer;
-    
-    // Insert a globally unique 16-byte value every few entries, so that one
-    // can seek into the middle of a file and then synchronize with record
-    // starts and ends by scanning for this value.
-    long lastSyncPos; // position of last sync
-    byte[] sync; // 16 random bytes
-    {
-      try {
-        MessageDigest digester = MessageDigest.getInstance("MD5");
-        long time = System.currentTimeMillis();
-        digester.update((new UID() + "@" + time).getBytes());
-        sync = digester.digest();
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-    
-    /** Implicit constructor: needed for the period of transition! */
-    Writer() {}
-    
-    /** Create the named file. */
-    public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass) throws IOException {
-      this(fs, conf, name, keyClass, valClass, null, new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, Progressable progress, Metadata metadata) throws IOException {
-      this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(),
-          progress, metadata);
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public Writer(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize,
-        Progressable progress, Metadata metadata) throws IOException {
-      init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, false, null, metadata);
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private Writer(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, Metadata metadata) throws IOException {
-      this.ownOutputStream = false;
-      init(null, conf, out, keyClass, valClass, false, null, metadata);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    /** Write the initial part of file header. */
-    void initializeFileHeader() throws IOException {
-      out.write(VERSION);
-    }
-    
-    /** Write the final part of file header. */
-    void finalizeFileHeader() throws IOException {
-      out.write(sync); // write the sync bytes
-      out.flush(); // flush header
-    }
-    
-    boolean isCompressed() {
-      return compress;
-    }
-    
-    boolean isBlockCompressed() {
-      return false;
-    }
-    
-    /** Write and flush the file header. */
-    void writeFileHeader() throws IOException {
-      Text.writeString(out, keyClass.getName());
-      Text.writeString(out, valClass.getName());
-      
-      out.writeBoolean(this.isCompressed());
-      out.writeBoolean(this.isBlockCompressed());
-      
-      if (this.isCompressed()) {
-        Text.writeString(out, (codec.getClass()).getName());
-      }
-      this.metadata.write(out);
-    }
-    
-    /** Initialize. */
-    
-    void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata)
-        throws IOException {
-      this.conf = conf;
-      this.out = out;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
-      this.compress = compress;
-      this.codec = codec;
-      this.metadata = metadata;
-      SerializationFactory serializationFactory = new SerializationFactory(conf);
-      this.keySerializer = serializationFactory.getSerializer(keyClass);
-      this.keySerializer.open(buffer);
-      this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
-      this.uncompressedValSerializer.open(buffer);
-      if (this.codec != null) {
-        ReflectionUtils.setConf(this.codec, this.conf);
-        this.compressor = CodecPool.getCompressor(this.codec);
-        this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
-        this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
-        this.compressedValSerializer = serializationFactory.getSerializer(valClass);
-        this.compressedValSerializer.open(deflateOut);
-      }
-    }
-    
-    /** Returns the class of keys in this file. */
-    public Class getKeyClass() {
-      return keyClass;
-    }
-    
-    /** Returns the class of values in this file. */
-    public Class getValueClass() {
-      return valClass;
-    }
-    
-    /** Returns the compression codec of data in this file. */
-    public CompressionCodec getCompressionCodec() {
-      return codec;
-    }
-    
-    /** create a sync point */
-    public void sync() throws IOException {
-      if (sync != null && lastSyncPos != out.getPos()) {
-        out.writeInt(SYNC_ESCAPE); // mark the start of the sync
-        out.write(sync); // write sync
-        lastSyncPos = out.getPos(); // update lastSyncPos
-      }
-    }
-    
-    /** Returns the configuration of this file. */
-    Configuration getConf() {
-      return conf;
-    }
-    
-    /** Close the file. */
-    public synchronized void close() throws IOException {
-      keySerializer.close();
-      uncompressedValSerializer.close();
-      if (compressedValSerializer != null) {
-        compressedValSerializer.close();
-      }
-      
-      CodecPool.returnCompressor(compressor);
-      compressor = null;
-      
-      if (out != null) {
-        
-        // Close the underlying stream iff we own it...
-        if (ownOutputStream) {
-          out.close();
-        } else {
-          out.flush();
-        }
-        out = null;
-      }
-    }
-    
-    synchronized void checkAndWriteSync() throws IOException {
-      if (sync != null && out.getPos() >= lastSyncPos + SYNC_INTERVAL) { // time to emit sync
-        sync();
-      }
-    }
-    
-    /** Append a key/value pair. */
-    public synchronized void append(Writable key, Writable val) throws IOException {
-      append((Object) key, (Object) val);
-    }
-    
-    /** Append a key/value pair. */
-    
-    public synchronized void append(Object key, Object val) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: " + key.getClass().getName() + " is not " + keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: " + val.getClass().getName() + " is not " + valClass);
-      
-      buffer.reset();
-      
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-      
-      // Append the 'value'
-      if (compress) {
-        deflateFilter.resetState();
-        compressedValSerializer.serialize(val);
-        deflateOut.flush();
-        deflateFilter.finish();
-      } else {
-        uncompressedValSerializer.serialize(val);
-      }
-      
-      // Write the record out
-      checkAndWriteSync(); // sync
-      out.writeInt(buffer.getLength()); // total record length
-      out.writeInt(keyLength); // key portion length
-      out.write(buffer.getData(), 0, buffer.getLength()); // data
-    }
-    
-    public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + keyLength);
-      
-      int valLength = val.getSize();
-      
-      checkAndWriteSync();
-      
-      out.writeInt(keyLength + valLength); // total record length
-      out.writeInt(keyLength); // key portion length
-      out.write(keyData, keyOffset, keyLength); // key
-      val.writeUncompressedBytes(out); // value
-    }
-    
-    /**
-     * Returns the current length of the output file.
-     * 
-     * <p>
-     * This always returns a synchronized position. In other words, immediately after calling {@link MySequenceFile.Reader#seek(long)} with a position returned
-     * by this method, {@link MySequenceFile.Reader#next(Writable)} may be called. However the key may be earlier in the file than key last written when this
-     * method was called (e.g., with block-compression, it may be the first key in the block that was being written when this method was called).
-     */
-    public synchronized long getLength() throws IOException {
-      return out.getPos();
-    }
-    
-  } // class Writer
-  
-  /** Write key/compressed-value pairs to a sequence-format file. */
-  static class RecordCompressWriter extends Writer {
-    
-    /** Create the named file. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException {
-      this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress,
-        Metadata metadata) throws IOException {
-      this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-          progress, metadata);
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication,
-        long blockSize, CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
-      super.init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, true, codec, metadata);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress)
-        throws IOException {
-      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
-    }
-    
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private RecordCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
-        throws IOException {
-      this.ownOutputStream = false;
-      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-      
-    }
-    
-    boolean isCompressed() {
-      return true;
-    }
-    
-    boolean isBlockCompressed() {
-      return false;
-    }
-    
-    /** Append a key/value pair. */
-    
-    public synchronized void append(Object key, Object val) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: " + key.getClass().getName() + " is not " + keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: " + val.getClass().getName() + " is not " + valClass);
-      
-      buffer.reset();
-      
-      // Append the 'key'
-      keySerializer.serialize(key);
-      int keyLength = buffer.getLength();
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-      
-      // Compress 'value' and append it
-      deflateFilter.resetState();
-      compressedValSerializer.serialize(val);
-      deflateOut.flush();
-      deflateFilter.finish();
-      
-      // Write the record out
-      checkAndWriteSync(); // sync
-      out.writeInt(buffer.getLength()); // total record length
-      out.writeInt(keyLength); // key portion length
-      out.write(buffer.getData(), 0, buffer.getLength()); // data
-    }
-    
-    /** Append a key/value pair. */
-    public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-      
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + keyLength);
-      
-      int valLength = val.getSize();
-      
-      checkAndWriteSync(); // sync
-      out.writeInt(keyLength + valLength); // total record length
-      out.writeInt(keyLength); // key portion length
-      out.write(keyData, keyOffset, keyLength); // 'key' data
-      val.writeCompressedBytes(out); // 'value' data
-    }
-    
-  } // RecordCompressionWriter
-  
-  /** Write compressed key/value blocks to a sequence-format file. */
-  static class BlockCompressWriter extends Writer {
-    
-    private int noBufferedRecords = 0;
-    
-    private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
-    private DataOutputBuffer keyBuffer = new DataOutputBuffer();
-    
-    private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
-    private DataOutputBuffer valBuffer = new DataOutputBuffer();
-    
-    private int compressionBlockSize;
-    
-    /** Create the named file. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec) throws IOException {
-      this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-          null, new Metadata());
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress,
-        Metadata metadata) throws IOException {
-      this(fs, conf, name, keyClass, valClass, fs.getConf().getInt("io.file.buffer.size", 4096), fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
-          progress, metadata);
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize,
-        CompressionCodec codec, Progressable progress, Metadata metadata) throws IOException {
-      super.init(name, conf, fs.create(name, true, bufferSize, replication, blockSize, progress), keyClass, valClass, true, codec, metadata);
-      init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    /** Create the named file with write-progress reporter. */
-    public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, CompressionCodec codec, Progressable progress)
-        throws IOException {
-      this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
-    }
-    
-    /** Write to an arbitrary stream using a specified buffer size. */
-    private BlockCompressWriter(Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
-        throws IOException {
-      this.ownOutputStream = false;
-      super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
-      init(1000000);
-      
-      initializeFileHeader();
-      writeFileHeader();
-      finalizeFileHeader();
-    }
-    
-    boolean isCompressed() {
-      return true;
-    }
-    
-    boolean isBlockCompressed() {
-      return true;
-    }
-    
-    /** Initialize */
-    void init(int compressionBlockSize) throws IOException {
-      this.compressionBlockSize = compressionBlockSize;
-      keySerializer.close();
-      keySerializer.open(keyBuffer);
-      uncompressedValSerializer.close();
-      uncompressedValSerializer.open(valBuffer);
-    }
-    
-    /** Workhorse to check and write out compressed data/lengths */
-    private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {
-      deflateFilter.resetState();
-      buffer.reset();
-      deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());
-      deflateOut.flush();
-      deflateFilter.finish();
-      
-      WritableUtils.writeVInt(out, buffer.getLength());
-      out.write(buffer.getData(), 0, buffer.getLength());
-    }
-    
-    /** Compress and flush contents to dfs */
-    public synchronized void sync() throws IOException {
-      if (noBufferedRecords > 0) {
-        super.sync();
-        
-        // No. of records
-        WritableUtils.writeVInt(out, noBufferedRecords);
-        
-        // Write 'keys' and lengths
-        writeBuffer(keyLenBuffer);
-        writeBuffer(keyBuffer);
-        
-        // Write 'values' and lengths
-        writeBuffer(valLenBuffer);
-        writeBuffer(valBuffer);
-        
-        // Flush the file-stream
-        out.flush();
-        
-        // Reset internal states
-        keyLenBuffer.reset();
-        keyBuffer.reset();
-        valLenBuffer.reset();
-        valBuffer.reset();
-        noBufferedRecords = 0;
-      }
-      
-    }
-    
-    /** Close the file. */
-    public synchronized void close() throws IOException {
-      if (out != null) {
-        sync();
-      }
-      super.close();
-    }
-    
-    /** Append a key/value pair. */
-    
-    public synchronized void append(Object key, Object val) throws IOException {
-      if (key.getClass() != keyClass)
-        throw new IOException("wrong key class: " + key + " is not " + keyClass);
-      if (val.getClass() != valClass)
-        throw new IOException("wrong value class: " + val + " is not " + valClass);
-      
-      // Save key/value into respective buffers
-      int oldKeyLength = keyBuffer.getLength();
-      keySerializer.serialize(key);
-      int keyLength = keyBuffer.getLength() - oldKeyLength;
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed: " + key);
-      WritableUtils.writeVInt(keyLenBuffer, keyLength);
-      
-      int oldValLength = valBuffer.getLength();
-      uncompressedValSerializer.serialize(val);
-      int valLength = valBuffer.getLength() - oldValLength;
-      WritableUtils.writeVInt(valLenBuffer, valLength);
-      
-      // Added another key/value pair
-      ++noBufferedRecords;
-      
-      // Compress and flush?
-      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
-      if (currentBlockSize >= compressionBlockSize) {
-        sync();
-      }
-    }
-    
-    /** Append a key/value pair. */
-    public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
-      
-      if (keyLength < 0)
-        throw new IOException("negative length keys not allowed");
-      
-      int valLength = val.getSize();
-      
-      // Save key/value data in relevant buffers
-      WritableUtils.writeVInt(keyLenBuffer, keyLength);
-      keyBuffer.write(keyData, keyOffset, keyLength);
-      WritableUtils.writeVInt(valLenBuffer, valLength);
-      val.writeUncompressedBytes(valBuffer);
-      
-      // Added another key/value pair
-      ++noBufferedRecords;
-      
-      // Compress and flush?
-      int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
-      if (currentBlockSize >= compressionBlockSize) {
-        sync();
-      }
-    }
-    
-  } // BlockCompressionWriter
-  
   /** Reads key/value pairs from a sequence-format file. */
   public static class Reader implements java.io.Closeable {
     private Path file;
@@ -2167,1008 +1278,4 @@ public class MySequenceFile {
     
   }
   
-  /**
-   * Sorts key/value pairs in a sequence-format file.
-   * 
-   * <p>
-   * For best performance, applications should make sure that the {@link Writable#readFields(DataInput)} implementation of their keys is very efficient. In
-   * particular, it should avoid allocating memory.
-   */
-  public static class Sorter {
-    
-    private RawComparator comparator;
-    
-    private MergeSort mergeSort; // the implementation of merge sort
-    
-    private Path[] inFiles; // when merging or sorting
-    
-    private Path outFile;
-    
-    private int memory; // bytes
-    private int factor; // merged per pass
-    
-    private FileSystem fs = null;
-    
-    private Class keyClass;
-    private Class valClass;
-    
-    private Configuration conf;
-    
-    private Progressable progressable = null;
-    
-    /** Sort and merge files containing the named classes. */
-    public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass, Class valClass, Configuration conf) {
-      this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
-    }
-    
-    /** Sort and merge using an arbitrary {@link RawComparator}. */
-    public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, Class valClass, Configuration conf) {
-      this.fs = fs;
-      this.comparator = comparator;
-      this.keyClass = keyClass;
-      this.valClass = valClass;
-      this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
-      this.factor = conf.getInt("io.sort.factor", 100);
-      this.conf = conf;
-    }
-    
-    /** Set the number of streams to merge at once. */
-    public void setFactor(int factor) {
-      this.factor = factor;
-    }
-    
-    /** Get the number of streams to merge at once. */
-    public int getFactor() {
-      return factor;
-    }
-    
-    /** Set the total amount of buffer memory, in bytes. */
-    public void setMemory(int memory) {
-      this.memory = memory;
-    }
-    
-    /** Get the total amount of buffer memory, in bytes. */
-    public int getMemory() {
-      return memory;
-    }
-    
-    /** Set the progressable object in order to report progress. */
-    public void setProgressable(Progressable progressable) {
-      this.progressable = progressable;
-    }
-    
-    /**
-     * Perform a file sort from a set of input files into an output file.
-     * 
-     * @param inFiles
-     *          the files to be sorted
-     * @param outFile
-     *          the sorted output file
-     * @param deleteInput
-     *          should the input files be deleted as they are read?
-     */
-    public void sort(Path[] inFiles, Path outFile, boolean deleteInput) throws IOException {
-      if (fs.exists(outFile)) {
-        throw new IOException("already exists: " + outFile);
-      }
-      
-      this.inFiles = inFiles;
-      this.outFile = outFile;
-      
-      int segments = sortPass(deleteInput);
-      if (segments > 1) {
-        mergePass(outFile.getParent());
-      }
-    }
-    
-    /**
-     * Perform a file sort from a set of input files and return an iterator.
-     * 
-     * @param inFiles
-     *          the files to be sorted
-     * @param tempDir
-     *          the directory where temp files are created during sort
-     * @param deleteInput
-     *          should the input files be deleted as they are read?
-     * @return iterator the RawKeyValueIterator
-     */
-    public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, boolean deleteInput) throws IOException {
-      Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
-      if (fs.exists(outFile)) {
-        throw new IOException("already exists: " + outFile);
-      }
-      this.inFiles = inFiles;
-      // outFile will basically be used as prefix for temp files in the cases
-      // where sort outputs multiple sorted segments. For the single segment
-      // case, the outputFile itself will contain the sorted data for that
-      // segment
-      this.outFile = outFile;
-      
-      int segments = sortPass(deleteInput);
-      if (segments > 1)
-        return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), tempDir);
-      else if (segments == 1)
-        return merge(new Path[] {outFile}, true, tempDir);
-      else
-        return null;
-    }
-    
-    /**
-     * The backwards compatible interface to sort.
-     * 
-     * @param inFile
-     *          the input file to sort
-     * @param outFile
-     *          the sorted output file
-     */
-    public void sort(Path inFile, Path outFile) throws IOException {
-      sort(new Path[] {inFile}, outFile, false);
-    }
-    
-    private int sortPass(boolean deleteInput) throws IOException {
-      LOG.debug("running sort pass");
-      SortPass sortPass = new SortPass(); // make the SortPass
-      sortPass.setProgressable(progressable);
-      mergeSort = new MergeSort(sortPass.new SeqFileComparator());
-      try {
-        return sortPass.run(deleteInput); // run it
-      } finally {
-        sortPass.close(); // close it
-      }
-    }
-    
-    private class SortPass {
-      private int memoryLimit = memory / 4;
-      private int recordLimit = 1000000;
-      
-      private DataOutputBuffer rawKeys = new DataOutputBuffer();
-      private byte[] rawBuffer;
-      
-      private int[] keyOffsets = new int[1024];
-      private int[] pointers = new int[keyOffsets.length];
-      private int[] pointersCopy = new int[keyOffsets.length];
-      private int[] keyLengths = new int[keyOffsets.length];
-      private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
-      
-      private ArrayList segmentLengths = new ArrayList();
-      
-      private Reader in = null;
-      private FSDataOutputStream out = null;
-      private FSDataOutputStream indexOut = null;
-      private Path outName;
-      
-      private Progressable progressable = null;
-      
-      public int run(boolean deleteInput) throws IOException {
-        int segments = 0;
-        int currentFile = 0;
-        boolean atEof = (currentFile >= inFiles.length);
-        boolean isCompressed = false;
-        boolean isBlockCompressed = false;
-        CompressionCodec codec = null;
-        segmentLengths.clear();
-        if (atEof) {
-          return 0;
-        }
-        
-        // Initialize
-        in = new Reader(fs, inFiles[currentFile], conf);
-        isCompressed = in.isCompressed();
-        isBlockCompressed = in.isBlockCompressed();
-        codec = in.getCompressionCodec();
-        
-        for (int i = 0; i < rawValues.length; ++i) {
-          rawValues[i] = null;
-        }
-        
-        while (!atEof) {
-          int count = 0;
-          int bytesProcessed = 0;
-          rawKeys.reset();
-          while (!atEof && bytesProcessed < memoryLimit && count < recordLimit) {
-            
-            // Read a record into buffer
-            // Note: Attempt to re-use 'rawValue' as far as possible
-            int keyOffset = rawKeys.getLength();
-            ValueBytes rawValue = (count == keyOffsets.length || rawValues[count] == null) ? in.createValueBytes() : rawValues[count];
-            int recordLength = in.nextRaw(rawKeys, rawValue);
-            if (recordLength == -1) {
-              in.close();
-              if (deleteInput) {
-                fs.delete(inFiles[currentFile], true);
-              }
-              currentFile += 1;
-              atEof = currentFile >= inFiles.length;
-              if (!atEof) {
-                in = new Reader(fs, inFiles[currentFile], conf);
-              } else {
-                in = null;
-              }
-              continue;
-            }
-            
-            int keyLength = rawKeys.getLength() - keyOffset;
-            
-            if (count == keyOffsets.length)
-              grow();
-            
-            keyOffsets[count] = keyOffset; // update pointers
-            pointers[count] = count;
-            keyLengths[count] = keyLength;
-            rawValues[count] = rawValue;
-            
-            bytesProcessed += recordLength;
-            count++;
-          }
-          
-          // buffer is full -- sort & flush it
-          LOG.debug("flushing segment " + segments);
-          rawBuffer = rawKeys.getData();
-          sort(count);
-          // indicate we're making progress
-          if (progressable != null) {
-            progressable.progress();
-          }
-          flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, segments == 0 && atEof);
-          segments++;
-        }
-        return segments;
-      }
-      
-      public void close() throws IOException {
-        if (in != null) {
-          in.close();
-        }
-        if (out != null) {
-          out.close();
-        }
-        if (indexOut != null) {
-          indexOut.close();
-        }
-      }
-      
-      private void grow() {
-        int newLength = keyOffsets.length * 3 / 2;
-        keyOffsets = grow(keyOffsets, newLength);
-        pointers = grow(pointers, newLength);
-        pointersCopy = new int[newLength];
-        keyLengths = grow(keyLengths, newLength);
-        rawValues = grow(rawValues, newLength);
-      }
-      
-      private int[] grow(int[] old, int newLength) {
-        int[] result = new int[newLength];
-        System.arraycopy(old, 0, result, 0, old.length);
-        return result;
-      }
-      
-      private ValueBytes[] grow(ValueBytes[] old, int newLength) {
-        ValueBytes[] result = new ValueBytes[newLength];
-        System.arraycopy(old, 0, result, 0, old.length);
-        for (int i = old.length; i < newLength; ++i) {
-          result[i] = null;
-        }
-        return result;
-      }
-      
-      private void flush(int count, int bytesProcessed, boolean isCompressed, boolean isBlockCompressed, CompressionCodec codec, boolean done)
-          throws IOException {
-        if (out == null) {
-          outName = done ? outFile : outFile.suffix(".0");
-          out = fs.create(outName);
-          if (!done) {
-            indexOut = fs.create(outName.suffix(".index"));
-          }
-        }
-        
-        long segmentStart = out.getPos();
-        Writer writer = createWriter(conf, out, keyClass, valClass, isCompressed, isBlockCompressed, codec, new Metadata());
-        
-        if (!done) {
-          writer.sync = null; // disable sync on temp files
-        }
-        
-        for (int i = 0; i < count; i++) { // write in sorted order
-          int p = pointers[i];
-          writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
-        }
-        writer.close();
-        
-        if (!done) {
-          // Save the segment length
-          WritableUtils.writeVLong(indexOut, segmentStart);
-          WritableUtils.writeVLong(indexOut, (out.getPos() - segmentStart));
-          indexOut.flush();
-        }
-      }
-      
-      private void sort(int count) {
-        System.arraycopy(pointers, 0, pointersCopy, 0, count);
-        mergeSort.mergeSort(pointersCopy, pointers, 0, count);
-      }
-      
-      class SeqFileComparator implements Comparator<IntWritable> {
-        public int compare(IntWritable I, IntWritable J) {
-          return comparator.compare(rawBuffer, keyOffsets[I.get()], keyLengths[I.get()], rawBuffer, keyOffsets[J.get()], keyLengths[J.get()]);
-        }
-      }
-      
-      /** set the progressable object in order to report progress */
-      public void setProgressable(Progressable progressable) {
-        this.progressable = progressable;
-      }
-      
-    } // MySequenceFile.Sorter.SortPass
-    
-    /** The interface to iterate over raw keys/values of SequenceFiles. */
-    public static interface RawKeyValueIterator {
-      /**
-       * Gets the current raw key
-       * 
-       * @return DataOutputBuffer
-       * @throws IOException
-       */
-      DataOutputBuffer getKey() throws IOException;
-      
-      /**
-       * Gets the current raw value
-       * 
-       * @return ValueBytes
-       * @throws IOException
-       */
-      ValueBytes getValue() throws IOException;
-      
-      /**
-       * Sets up the current key and value (for getKey and getValue)
-       * 
-       * @return true if there exists a key/value, false otherwise
-       * @throws IOException
-       */
-      boolean next() throws IOException;
-      
-      /**
-       * closes the iterator so that the underlying streams can be closed
-       * 
-       * @throws IOException
-       */
-      void close() throws IOException;
-      
-      /**
-       * Gets the Progress object; this has a float (0.0 - 1.0) indicating the bytes processed by the iterator so far
-       */
-      Progress getProgress();
-    }
-    
-    /**
-     * Merges the list of segments of type <code>SegmentDescriptor</code>
-     * 
-     * @param segments
-     *          the list of SegmentDescriptors
-     * @param tmpDir
-     *          the directory to write temporary files into
-     * @return RawKeyValueIterator
-     */
-    public RawKeyValueIterator merge(List<SegmentDescriptor> segments, Path tmpDir) throws IOException {
-      // pass in object to report progress, if present
-      MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
-      return mQueue.merge();
-    }
-    
-    /**
-     * Merges the contents of files passed in Path[] using a max factor value that is already set
-     * 
-     * @param inNames
-     *          the array of path names
-     * @param deleteInputs
-     *          true if the input files should be deleted when unnecessary
-     * @param tmpDir
-     *          the directory to write temporary files into
-     * @return RawKeyValueIteratorMergeQueue
-     */
-    public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, Path tmpDir) throws IOException {
-      return merge(inNames, deleteInputs, (inNames.length < factor) ? inNames.length : factor, tmpDir);
-    }
-    
-    /**
-     * Merges the contents of files passed in Path[]
-     * 
-     * @param inNames
-     *          the array of path names
-     * @param deleteInputs
-     *          true if the input files should be deleted when unnecessary
-     * @param factor
-     *          the factor that will be used as the maximum merge fan-in
-     * @param tmpDir
-     *          the directory to write temporary files into
-     * @return RawKeyValueIteratorMergeQueue
-     */
-    public RawKeyValueIterator merge(Path[] inNames, boolean deleteInputs, int factor, Path tmpDir) throws IOException {
-      // get the segments from inNames
-      ArrayList<SegmentDescriptor> a = new ArrayList<SegmentDescriptor>();
-      for (int i = 0; i < inNames.length; i++) {
-        SegmentDescriptor s = new SegmentDescriptor(0, fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
-        s.preserveInput(!deleteInputs);
-        s.doSync();
-        a.add(s);
-      }
-      this.factor = factor;
-      MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
-      return mQueue.merge();
-    }
-    
-    /**
-     * Merges the contents of files passed in Path[]
-     * 
-     * @param inNames
-     *          the array of path names
-     * @param tempDir
-     *          the directory for creating temp files during merge
-     * @param deleteInputs
-     *          true if the input files should be deleted when unnecessary
-     * @return RawKeyValueIteratorMergeQueue
-     */
-    public RawKeyValueIterator merge(Path[] inNames, Path tempDir, boolean deleteInputs) throws IOException {
-      // outFile will basically be used as prefix for temp files for the
-      // intermediate merge outputs
-      this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
-      // get the segments from inNames
-      ArrayList<SegmentDescriptor> a = new ArrayList<SegmentDescriptor>();
-      for (int i = 0; i < inNames.length; i++) {
-        SegmentDescriptor s = new SegmentDescriptor(0, fs.getFileStatus(inNames[i]).getLen(), inNames[i]);
-        s.preserveInput(!deleteInputs);
-        s.doSync();
-        a.add(s);
-      }
-      factor = (inNames.length < factor) ? inNames.length : factor;
-      // pass in object to report progress, if present
-      MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
-      return mQueue.merge();
-    }
-    
-    /**
-     * Clones the attributes (like compression of the input file and creates a corresponding Writer
-     * 
-     * @param inputFile
-     *          the path of the input file whose attributes should be cloned
-     * @param outputFile
-     *          the path of the output file
-     * @param prog
-     *          the Progressable to report status during the file write
-     * @return Writer
-     */
-    public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException {
-      FileSystem srcFileSys = inputFile.getFileSystem(conf);
-      Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
-      boolean compress = reader.isCompressed();
-      boolean blockCompress = reader.isBlockCompressed();
-      CompressionCodec codec = reader.getCompressionCodec();
-      reader.close();
-      
-      Writer writer = createWriter(outputFile.getFileSystem(conf), conf, outputFile, keyClass, valClass, compress, blockCompress, codec, prog, new Metadata());
-      return writer;
-    }
-    
-    /**
-     * Writes records from RawKeyValueIterator into a file represented by the passed writer
-     * 
-     * @param records
-     *          the RawKeyValueIterator
-     * @param writer
-     *          the Writer created earlier
-     */
-    public void writeFile(RawKeyValueIterator records, Writer writer) throws IOException {
-      while (records.next()) {
-        writer.appendRaw(records.getKey().getData(), 0, records.getKey().getLength(), records.getValue());
-      }
-      writer.sync();
-    }
-    
-    /**
-     * Merge the provided files.
-     * 
-     * @param inFiles
-     *          the array of input path names
-     * @param outFile
-     *          the final output file
-     */
-    public void merge(Path[] inFiles, Path outFile) throws IOException {
-      if (fs.exists(outFile)) {
-        throw new IOException("already exists: " + outFile);
-      }
-      RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
-      Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
-      
-      writeFile(r, writer);
-      
-      writer.close();
-    }
-    
-    /** sort calls this to generate the final merged output */
-    private int mergePass(Path tmpDir) throws IOException {
-      LOG.debug("running merge pass");
-      Writer writer = cloneFileAttributes(outFile.suffix(".0"), outFile, null);
-      RawKeyValueIterator r = merge(outFile.suffix(".0"), outFile.suffix(".0.index"), tmpDir);
-      writeFile(r, writer);
-      
-      writer.close();
-      return 0;
-    }
-    
-    /**
-     * Used by mergePass to merge the output of the sort
-     * 
-     * @param inName
-     *          the name of the input file containing sorted segments
-     * @param indexIn
-     *          the offsets of the sorted segments
-     * @param tmpDir
-     *          the relative directory to store intermediate results in
-     * @return RawKeyValueIterator
-     * @throws IOException
-     */
-    private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) throws IOException {
-      // get the segments from indexIn
-      // we create a SegmentContainer so that we can track segments belonging to
-      // inName and delete inName as soon as we see that we have looked at all
-      // the contained segments during the merge process & hence don't need
-      // them anymore
-      SegmentContainer container = new SegmentContainer(inName, indexIn);
-      MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
-      return mQueue.merge();
-    }
-    
-    /** This class implements the core of the merge logic */
-    private class MergeQueue extends PriorityQueue implements RawKeyValueIterator {
-      private boolean compress;
-      private boolean blockCompress;
-      private DataOutputBuffer rawKey = new DataOutputBuffer();
-      private ValueBytes rawValue;
-      private long totalBytesProcessed;
-      private float progPerByte;
-      private Progress mergeProgress = new Progress();
-      private Path tmpDir;
-      private Progressable progress = null; // handle to the progress reporting object
-      private SegmentDescriptor minSegment;
-      
-      // a TreeMap used to store the segments sorted by size (segment offset and
-      // segment path name is used to break ties between segments of same sizes)
-      private Map<SegmentDescriptor,Void> sortedSegmentSizes = new TreeMap<SegmentDescriptor,Void>();
-      
-      public void put(SegmentDescriptor stream) throws IOException {
-        if (size() == 0) {
-          compress = stream.in.isCompressed();
-          blockCompress = stream.in.isBlockCompressed();
-        } else if (compress != stream.in.isCompressed() || blockCompress != stream.in.isBlockCompressed()) {
-          throw new IOException("All merged files must be compressed or not.");
-        }
-        super.put(stream);
-      }
-      
-      /**
-       * A queue of file segments to merge
-       * 
-       * @param segments
-       *          the file segments to merge
-       * @param tmpDir
-       *          a relative local directory to save intermediate files in
-       * @param progress
-       *          the reference to the Progressable object
-       */
-      public MergeQueue(List<SegmentDescriptor> segments, Path tmpDir, Progressable progress) {
-        int size = segments.size();
-        for (int i = 0; i < size; i++) {
-          sortedSegmentSizes.put(segments.get(i), null);
-        }
-        this.tmpDir = tmpDir;
-        this.progress = progress;
-      }
-      
-      protected boolean lessThan(Object a, Object b) {
-        // indicate we're making progress
-        if (progress != null) {
-          progress.progress();
-        }
-        SegmentDescriptor msa = (SegmentDescriptor) a;
-        SegmentDescriptor msb = (SegmentDescriptor) b;
-        return comparator.compare(msa.getKey().getData(), 0, msa.getKey().getLength(), msb.getKey().getData(), 0, msb.getKey().getLength()) < 0;
-      }
-      
-      public void close() throws IOException {
-        SegmentDescriptor ms; // close inputs
-        while ((ms = (SegmentDescriptor) pop()) != null) {
-          ms.cleanup();
-        }
-        minSegment = null;
-      }
-      
-      public DataOutputBuffer getKey() throws IOException {
-        return rawKey;
-      }
-      
-      public ValueBytes getValue() throws IOException {
-        return rawValue;
-      }
-      
-      public boolean next() throws IOException {
-        if (size() == 0)
-          return false;
-        if (minSegment != null) {
-          // minSegment is non-null for all invocations of next except the first
-          // one. For the first invocation, the priority queue is ready for use
-          // but for the subsequent invocations, first adjust the queue
-          adjustPriorityQueue(minSegment);
-          if (size() == 0) {
-            minSegment = null;
-            return false;
-          }
-        }
-        minSegment = (SegmentDescriptor) top();
-        long startPos = minSegment.in.getPosition(); // Current position in stream
-        // save the raw key reference
-        rawKey = minSegment.getKey();
-        // load the raw value. Re-use the existing rawValue buffer
-        if (rawValue == null) {
-          rawValue = minSegment.in.createValueBytes();
-        }
-        minSegment.nextRawValue(rawValue);
-        long endPos = minSegment.in.getPosition(); // End position after reading value
-        updateProgress(endPos - startPos);
-        return true;
-      }
-      
-      public Progress getProgress() {
-        return mergeProgress;
-      }
-      
-      private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException {
-        long startPos = ms.in.getPosition(); // Current position in stream
-        boolean hasNext = ms.nextRawKey();
-        long endPos = ms.in.getPosition(); // End position after reading key
-        updateProgress(endPos - startPos);
-        if (hasNext) {
-          adjustTop();
-        } else {
-          pop();
-          ms.cleanup();
-        }
-      }
-      
-      private void updateProgress(long bytesProcessed) {
-        totalBytesProcessed += bytesProcessed;
-        if (progPerByte > 0) {
-          mergeProgress.set(totalBytesProcessed * progPerByte);
-        }
-      }
-      
-      /**
-       * This is the single level merge that is called multiple times depending on the factor size and the number of segments
-       * 
-       * @return RawKeyValueIterator
-       */
-      public RawKeyValueIterator merge() throws IOException {
-        // create the MergeStreams from the sorted map created in the constructor
-        // and dump the final output to a file
-        int numSegments = sortedSegmentSizes.size();
-        int origFactor = factor;
-        int passNo = 1;
-        LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-        do {
-          // get the factor for this pass of merge
-          factor = getPassFactor(passNo, numSegments);
-          List<SegmentDescriptor> segmentsToMerge = new ArrayList<SegmentDescriptor>();
-          int segmentsConsidered = 0;
-          int numSegmentsToConsider = factor;
-          while (true) {
-            // extract the smallest 'factor' number of segment pointers from the
-            // TreeMap. Call cleanup on the empty segments (no key/value data)
-            SegmentDescriptor[] mStream = getSegmentDescriptors(numSegmentsToConsider);
-            for (int i = 0; i < mStream.length; i++) {
-              if (mStream[i].nextRawKey()) {
-                segmentsToMerge.add(mStream[i]);
-                segmentsConsidered++;
-                // Count the fact that we read some bytes in calling nextRawKey()
-                updateProgress(mStream[i].in.getPosition());
-              } else {
-                mStream[i].cleanup();
-                numSegments--; // we ignore this segment for the merge
-              }
-            }
-            // if we have the desired number of segments
-            // or looked at all available segments, we break
-            if (segmentsConsidered == factor || sortedSegmentSizes.size() == 0) {
-              break;
-            }
-            
-            numSegmentsToConsider = factor - segmentsConsidered;
-          }
-          // feed the streams to the priority queue
-          initialize(segmentsToMerge.size());
-          clear();
-          for (int i = 0; i < segmentsToMerge.size(); i++) {
-            put(segmentsToMerge.get(i));
-          }
-          // if we have lesser number of segments remaining, then just return the
-          // iterator, else do another single level merge
-          if (numSegments <= factor) {
-            // calculate the length of the remaining segments. Required for
-            // calculating the merge progress
-            long totalBytes = 0;
-            for (int i = 0; i < segmentsToMerge.size(); i++) {
-              totalBytes += segmentsToMerge.get(i).segmentLength;
-            }
-            if (totalBytes != 0) // being paranoid
-              progPerByte = 1.0f / (float) totalBytes;
-            // reset factor to what it originally was
-            factor = origFactor;
-            return this;
-          }
-          // we want to spread the creation of temp files on multiple disks if
-          // available under the space constraints
-          long approxOutputSize = 0;
-          for (SegmentDescriptor s : segmentsToMerge) {
-            approxOutputSize += s.segmentLength + ChecksumFileSystem.getApproxChkSumLength(s.segmentLength);
-          }
-          Path tmpFilename = new Path(tmpDir, "intermediate").suffix("." + passNo);
-          
-          Path outputFile = lDirAlloc.getLocalPathForWrite(tmpFilename.toString(), approxOutputSize, conf);
-          LOG.debug("writing intermediate results to " + outputFile);
-          Writer writer = cloneFileAttributes(fs.makeQualified(segmentsToMerge.get(0).segmentPathName), fs.makeQualified(outputFile), null);
-          writer.sync = null; // disable sync for temp files
-          writeFile(this, writer);
-          writer.close();
-          
-          // we finished one single level merge; now clean up the priority
-          // queue
-          this.close();
-          
-          SegmentDescriptor tempSegment = new SegmentDescriptor(0, fs.getFileStatus(outputFile).getLen(), outputFile);
-          // put the segment back in the TreeMap
-          sortedSegmentSizes.put(tempSegment, null);
-          numSegments = sortedSegmentSizes.size();
-          passNo++;
-          // we are worried about only the first pass merge factor. So reset the
-          // factor to what it originally was
-          factor = origFactor;
-        } while (true);
-      }
-      
-      // Hadoop-591
-      public int getPassFactor(int passNo, int numSegments) {
-        if (passNo > 1 || numSegments <= factor || factor == 1)
-          return factor;
-        int mod = (numSegments - 1) % (factor - 1);
-        if (mod == 0)
-          return factor;
-        return mod + 1;
-      }
-      
-      /**
-       * Return (& remove) the requested number of segment descriptors from the sorted map.
-       */
-      public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
-        if (numDescriptors > sortedSegmentSizes.size())
-          numDescriptors = sortedSegmentSizes.size();
-        SegmentDescriptor[] SegmentDescriptors = new SegmentDescriptor[numDescriptors];
-        Iterator iter = sortedSegmentSizes.keySet().iterator();
-        int i = 0;
-        while (i < numDescriptors) {
-          SegmentDescriptors[i++] = (SegmentDescriptor) iter.next();
-          iter.remove();
-        }
-        return SegmentDescriptors;
-      }
-    } // MySequenceFile.Sorter.MergeQueue
-    
-    /**
-     * This class defines a merge segment. This class can be subclassed to provide a customized cleanup method implementation. In this implementation, cleanup
-     * closes the file handle and deletes the file
-     */
-    public class SegmentDescriptor implements Comparable {
-      
-      long segmentOffset; // the start of the segment in the file
-      long segmentLength; // the length of the segment
-      Path segmentPathName; // the path name of the file containing the segment
-      boolean ignoreSync = true; // set to true for temp files
-      private Reader in = null;
-      private DataOutputBuffer rawKey = null; // this will hold the current key
-      private boolean preserveInput = false; // delete input segment files?
-      
-      /**
-       * Constructs a segment
-       * 
-       * @param segmentOffset
-       *          the offset of the segment in the file
-       * @param segmentLength
-       *          the length of the segment
-       * @param segmentPathName
-       *          the path name of the file containing the segment
-       */
-      public SegmentDescriptor(long segmentOffset, long segmentLength, Path segmentPathName) {
-        this.segmentOffset = segmentOffset;
-        this.segmentLength = segmentLength;
-        this.segmentPathName = segmentPathName;
-      }
-      
-      /** Do the sync checks */
-      public void doSync() {
-        ignoreSync = false;
-      }
-      
-      /** Whether to delete the files when no longer needed */
-      public void preserveInput(boolean preserve) {
-        preserveInput = preserve;
-      }
-      
-      public boolean shouldPreserveInput() {
-        return preserveInput;
-      }
-      
-      public int compareTo(Object o) {
-        SegmentDescriptor that = (SegmentDescriptor) o;
-        if (this.segmentLength != that.segmentLength) {
-          return (this.segmentLength < that.segmentLength ? -1 : 1);
-        }
-        if (this.segmentOffset != that.segmentOffset) {
-          return (this.segmentOffset < that.segmentOffset ? -1 : 1);
-        }
-        return (this.segmentPathName.toString()).compareTo(that.segmentPathName.toString());
-      }
-      
-      public boolean equals(Object o) {
-        if (!(o instanceof SegmentDescriptor)) {
-          return false;
-        }
-        SegmentDescriptor that = (SegmentDescriptor) o;
-        if (this.segmentLength == that.segmentLength && this.segmentOffset == that.segmentOffset
-            && this.segmentPathName.toString().equals(that.segmentPathName.toString())) {
-          return true;
-        }
-        return false;
-      }
-      
-      public int hashCode() {
-        return 37 * 17 + (int) (segmentOffset ^ (segmentOffset >>> 32));
-      }
-      
-      /**
-       * Fills up the rawKey object with the key returned by the Reader
-       * 
-       * @return true if there is a key returned; false, otherwise
-       */
-      public boolean nextRawKey() throws IOException {
-        if (in == null) {
-          int bufferSize = conf.getInt("io.file.buffer.size", 4096);
-          if (fs.getUri().getScheme().startsWith("ramfs")) {
-            bufferSize = conf.getInt("io.bytes.per.checksum", 512);
-          }
-          Reader reader = new Reader(fs, segmentPathName, bufferSize, segmentOffset, segmentLength, conf, false);
-          
-          // sometimes we ignore syncs especially for temp merge files
-          if (ignoreSync)
-            reader.sync = null;
-          
-          if (reader.getKeyClass() != keyClass)
-            throw new IOException("wrong key class: " + reader.getKeyClass() + " is not " + keyClass);
-          if (reader.getValueClass() != valClass)
-            throw new IOException("wrong value class: " + reader.getValueClass() + " is not " + valClass);
-          this.in = reader;
-          rawKey = new DataOutputBuffer();
-        }
-        rawKey.reset();
-        int keyLength = in.nextRawKey(rawKey);
-        return (keyLength >= 0);
-      }
-      
-      /**
-       * Fills up the passed rawValue with the value corresponding to the key read earlier
-       * 
-       * @return the length of the value
-       */
-      public int nextRawValue(ValueBytes rawValue) throws IOException {
-        int valLength = in.nextRawValue(rawValue);
-        return valLength;
-      }
-      
-      /** Returns the stored rawKey */
-      public DataOutputBuffer getKey() {
-        return rawKey;
-      }
-      
-      /** closes the underlying reader */
-      private void close() throws IOException {
-        this.in.close();
-        this.in = null;
-      }
-      
-      /**
-       * The default cleanup. Subclasses can override this with a custom cleanup
-       */
-      public void cleanup() throws IOException {
-        close();
-        if (!preserveInput) {
-          fs.delete(segmentPathName, true);
-        }
-      }
-    } // MySequenceFile.Sorter.SegmentDescriptor
-    
-    /**
-     * This class provisions multiple segments contained within a single file
-     */
-    private class LinkedSegmentsDescriptor extends SegmentDescriptor {
-      
-      SegmentContainer parentContainer = null;
-      
-      /**
-       * Constructs a segment
-       * 
-       * @param segmentOffset
-       *          the offset of the segment in the file
-       * @param segmentLength
-       *          the length of the segment
-       * @param segmentPathName
-       *          the path name of the file containing the segment
-       * @param parent
-       *          the parent SegmentContainer that holds the segment
-       */
-      public LinkedSegmentsDescriptor(long segmentOffset, long segmentLength, Path segmentPathName, SegmentContainer parent) {
-        super(segmentOffset, segmentLength, segmentPathName);
-        this.parentContainer = parent;
-      }
-      
-      /**
-       * The default cleanup. Subclasses can override this with a custom cleanup
-       */
-      public void cleanup() throws IOException {
-        super.close();
-        if (super.shouldPreserveInput())
-          return;
-        parentContainer.cleanup();
-      }
-    } // MySequenceFile.Sorter.LinkedSegmentsDescriptor
-    
-    /**
-     * The class that defines a container for segments to be merged. Primarily required to delete temp files as soon as all the contained segments have been
-     * looked at
-     */
-    private class SegmentContainer {
-      private int numSegmentsCleanedUp = 0; // track the no. of segment cleanups
-      private int numSegmentsContained; // # of segments contained
-      private Path inName; // input file from where segments are created
-      
-      // the list of segments read from the file
-      private ArrayList<SegmentDescriptor> segments = new ArrayList<SegmentDescriptor>();
-      
-      /**
-       * This constructor is there primarily to serve the sort routine that generates a single output file with an associated index file
-       */
-      public SegmentContainer(Path inName, Path indexIn) throws IOException {
-        // get the segments from indexIn
-        FSDataInputStream fsIndexIn = fs.open(indexIn);
-        long end = fs.getFileStatus(indexIn).getLen();
-        while (fsIndexIn.getPos() < end) {
-          long segmentOffset = WritableUtils.readVLong(fsIndexIn);
-          long segmentLength = WritableUtils.readVLong(fsIndexIn);
-          Path segmentName = inName;
-          segments.add(new LinkedSegmentsDescriptor(segmentOffset, segmentLength, segmentName, this));
-        }
-        fsIndexIn.close();
-        fs.delete(indexIn, true);
-        numSegmentsContained = segments.size();
-        this.inName = inName;
-      }
-      
-      public List<SegmentDescriptor> getSegmentList() {
-        return segments;
-      }
-      
-      public void cleanup() throws IOException {
-        numSegmentsCleanedUp++;
-        if (numSegmentsCleanedUp == numSegmentsContained) {
-          fs.delete(inName, true);
-        }
-      }
-    } // MySequenceFile.Sorter.SegmentContainer
-    
-  } // MySequenceFile.Sorter
-  
 } // MySequenceFile

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/master/thrift/MasterClientService.java Fri Dec 23 17:53:12 2011
@@ -2990,6 +2990,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);
@@ -13069,6 +13071,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/MutationLogger.java Fri Dec 23 17:53:12 2011
@@ -9319,6 +9319,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java (original)
+++ incubator/accumulo/trunk/src/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java Fri Dec 23 17:53:12 2011
@@ -9884,6 +9884,8 @@ import org.slf4j.LoggerFactory;
 
     private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
       try {
+        // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+        __isset_bit_vector = new BitSet(1);
         read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
       } catch (org.apache.thrift.TException te) {
         throw new java.io.IOException(te);

Modified: incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java (original)
+++ incubator/accumulo/trunk/src/core/src/test/java/org/apache/accumulo/core/data/MapFileTest.java Fri Dec 23 17:53:12 2011
@@ -20,24 +20,19 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Random;
 import java.util.Map.Entry;
+import java.util.Random;
 
 import junit.framework.TestCase;
 
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.map.MyMapFile;
-import org.apache.accumulo.core.file.map.MySequenceFile;
-import org.apache.accumulo.core.file.map.MySequenceFile.CompressionType;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 
@@ -55,7 +50,7 @@ public class MapFileTest extends TestCas
       /*****************************
        * write out the test map file
        */
-      MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Key.class, Value.class, CompressionType.BLOCK);
+      MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Key.class, Value.class);
       Value value = new Value(new byte[10]);
       for (int i = 0; i < 10; i++) {
         Text row = new Text(String.format("%08d", i));
@@ -101,7 +96,7 @@ public class MapFileTest extends TestCas
       /*****************************
        * write out the test map file
        */
-      MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, CompressionType.BLOCK);
+      MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class);
       Text key = new Text();
       BytesWritable value;
       Random r = new Random();
@@ -229,73 +224,11 @@ public class MapFileTest extends TestCas
     }
   }
   
-  public void testMapFileFix() {
-    try {
-      Configuration conf = CachedConfiguration.getInstance();
-      FileSystem fs = FileSystem.get(conf);
-      conf.setInt("io.seqfile.compress.blocksize", 4000);
-      
-      for (CompressionType compressionType : CompressionType.values()) {
-        /*****************************
-         * write out the test map file
-         */
-        MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, compressionType);
-        BytesWritable value;
-        Random r = new Random();
-        byte[] bytes = new byte[1024];
-        for (int i = 0; i < 1000; i++) {
-          String keyString = Integer.toString(i + 1000000);
-          Text key = new Text(keyString);
-          r.nextBytes(bytes);
-          value = new BytesWritable(bytes);
-          mfw.append(key, value);
-        }
-        mfw.close();
-        
-        /************************************
-         * move the index file
-         */
-        fs.rename(new Path("/tmp/testMapFileIndexingMap/index"), new Path("/tmp/testMapFileIndexingMap/oldIndex"));
-        
-        /************************************
-         * recreate the index
-         */
-        MyMapFile.fix(fs, new Path("/tmp/testMapFileIndexingMap"), Text.class, BytesWritable.class, false, conf);
-        
-        /************************************
-         * compare old and new indices
-         */
-        MySequenceFile.Reader oldIndexReader = new MySequenceFile.Reader(fs, new Path("/tmp/testMapFileIndexingMap/oldIndex"), conf);
-        MySequenceFile.Reader newIndexReader = new MySequenceFile.Reader(fs, new Path("/tmp/testMapFileIndexingMap/index"), conf);
-        
-        Text oldKey = new Text();
-        Text newKey = new Text();
-        LongWritable oldValue = new LongWritable();
-        LongWritable newValue = new LongWritable();
-        while (true) {
-          boolean moreKeys = false;
-          // check for the same number of records
-          assertTrue((moreKeys = oldIndexReader.next(oldKey, oldValue)) == newIndexReader.next(newKey, newValue));
-          if (!moreKeys)
-            break;
-          assertTrue(oldKey.compareTo(newKey) == 0);
-          assertTrue(oldValue.compareTo(newValue) == 0);
-        }
-        oldIndexReader.close();
-        newIndexReader.close();
-        
-        fs.delete(new Path("/tmp/testMapFileIndexingMap"), true);
-      }
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-  
   public static void main(String[] args) {
     try {
       Configuration conf = CachedConfiguration.getInstance();
       FileSystem fs = FileSystem.get(conf);
-      MyMapFile.Writer mfw = new MyMapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class, CompressionType.BLOCK);
+      MapFile.Writer mfw = new MapFile.Writer(conf, fs, "/tmp/testMapFileIndexingMap", Text.class, BytesWritable.class);
       Text key = new Text();
       BytesWritable value;
       Random r = new Random();

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1222766&r1=1222765&r2=1222766&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Fri Dec 23 17:53:12 2011
@@ -208,10 +208,10 @@ public class BulkImport extends MasterRe
         }
       } else {
         // assume it is a map file
-        extension = MyMapFile.EXTENSION;
+        extension = Constants.MAPFILE_EXTENSION;
       }
       
-      if (extension.equals(MyMapFile.EXTENSION)) {
+      if (extension.equals(Constants.MAPFILE_EXTENSION)) {
         if (!fileStatus.isDir()) {
           log.warn(fileStatus.getPath() + " is not a map file, ignoring");
           continue;