You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2013/02/06 15:08:27 UTC

svn commit: r1442969 - in /hama/trunk/core/src: main/java/org/apache/hama/ main/java/org/apache/hama/bsp/message/io/ main/java/org/apache/hama/bsp/message/queue/ test/java/org/apache/hama/bsp/message/

Author: tommaso
Date: Wed Feb  6 14:08:26 2013
New Revision: 1442969

URL: http://svn.apache.org/viewvc?rev=1442969&view=rev
Log:
HAMA-721 - committed Suraj's patch

Modified:
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Wed Feb  6 14:08:26 2013
@@ -64,12 +64,12 @@ public interface Constants {
   public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
 
   public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
-  
+
   public static final String COMBINER_CLASS = "bsp.combiner.class";
 
   public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
-  
-  ////////////////////////////////////////
+
+  // //////////////////////////////////////
   // Task scheduler related constants
   // //////////////////////////////////////
 
@@ -97,11 +97,20 @@ public interface Constants {
   // /////////////////////////////////////////////
   // Job configuration related parameters.
   // /////////////////////////////////////////////
-  public static final String JOB_INPUT_DIR      = "bsp.input.dir";
-  public static final String JOB_PEERS_COUNT    = "bsp.peers.num";
-  public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class"; 
+  public static final String JOB_INPUT_DIR = "bsp.input.dir";
+  public static final String JOB_PEERS_COUNT = "bsp.peers.num";
+  public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
   public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
-  
+  public static final String MESSAGE_CLASS = "bsp.message.type.class";
+
+  // /////////////////////////////////////////////
+  // Messaging related parameters.
+  // /////////////////////////////////////////////
+  public static final int BUFFER_DEFAULT_SIZE = 16 * 1024;
+  public static final String BYTEBUFFER_SIZE = "bsp.message.bytebuffer.size";
+  public static final String BYTEBUFFER_DIRECT = "bsp.message.bytebuffer.direct";
+  public static final boolean BYTEBUFFER_DIRECT_DEFAULT = true;
+  public static final String DATA_SPILL_PATH = "bsp.data.spill.location";
 
   // /////////////////////////////////////////////
   // Constants related to partitioning
@@ -112,7 +121,6 @@ public interface Constants {
   public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
   public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
 
-
   // /////////////////////////////////////
   // Constants for ZooKeeper
   // /////////////////////////////////////

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java Wed Feb  6 14:08:26 2013
@@ -50,7 +50,8 @@ class BufferReadStatus extends ReadIndex
   }
 
   @Override
-  public void startReading() {
+  public boolean startReading() {
+    return true;
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java Wed Feb  6 14:08:26 2013
@@ -142,12 +142,14 @@ public class PreFetchCache<M extends Wri
 
   public void startFetching(Class<M> classObject,
       SpilledDataInputBuffer buffer, Configuration conf)
-      throws InterruptedException {
+      throws InterruptedException, IOException {
 
     preFetchThread = new PreFetchThread<M>(classObject, objectListArr,
         capacity, buffer, totalMessages, status, conf);
     preFetchThread.start();
-    status.startReading();
+    if(!status.startReading()){
+      throw new IOException("Failed to start reading the spilled file: ");
+    }
     arrIndex = status.getReadBufferIndex();
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java Wed Feb  6 14:08:26 2013
@@ -52,7 +52,7 @@ abstract class ReadIndexStatus {
   /**
    * Indicate to start reading.
    */
-  public abstract void startReading();
+  public abstract boolean startReading();
 
 }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java Wed Feb  6 14:08:26 2013
@@ -22,8 +22,8 @@ import java.util.BitSet;
 
 /**
  * This class provides a thread-safe interface for both the spilling thread and
- * the thread that is writing into <code>SpillingBuffer</code>. It stores the
- * state and manipulates the next available buffer for both the threads.
+ * the thread that is writing into {@link SpillingDataOutputBuffer}. It stores
+ * the state and manipulates the next available buffer for both the threads.
  */
 class SpillWriteIndexStatus {
 
@@ -33,6 +33,7 @@ class SpillWriteIndexStatus {
   private volatile boolean spillStart;
   private int numBuffers;
   private volatile BitSet bufferBitState;
+  private volatile boolean errorState;
 
   SpillWriteIndexStatus(int size, int bufferCount, int bufferIndex,
       int fileWriteIndex, BitSet bufferBitState) {
@@ -42,6 +43,7 @@ class SpillWriteIndexStatus {
     processorBufferIndex = fileWriteIndex;
     numBuffers = bufferCount;
     this.bufferBitState = bufferBitState;
+    errorState = false;
   }
 
   /**
@@ -57,14 +59,23 @@ class SpillWriteIndexStatus {
     bufferBitState.set(bufferListWriteIndex, true);
     notify();
     bufferListWriteIndex = (bufferListWriteIndex + 1) % numBuffers;
-    while (bufferBitState.get(bufferListWriteIndex)) {
+    while (bufferBitState.get(bufferListWriteIndex) && !errorState) {
       try {
         wait();
       } catch (InterruptedException e) {
         throw new IOException(e);
       }
     }
-    return bufferListWriteIndex;
+    return checkError(bufferListWriteIndex);
+  }
+
+  private int checkError(int index) {
+    return errorState ? -1 : index;
+  }
+
+  public void notifyError() {
+    errorState = true;
+    notify();
   }
 
   /**
@@ -90,15 +101,16 @@ class SpillWriteIndexStatus {
       notify();
     }
     processorBufferIndex = (processorBufferIndex + 1) % numBuffers;
-    while (!bufferBitState.get(processorBufferIndex) && !spillComplete) {
+    while (!bufferBitState.get(processorBufferIndex) && !spillComplete
+        && !errorState) {
       wait();
     }
     // Is the last buffer written to file after the spilling is complete?
     // then complete the operation.
-    if (spillComplete && bufferBitState.isEmpty()) {
+    if ((spillComplete && bufferBitState.isEmpty()) || errorState) {
       return -1;
     }
-    return processorBufferIndex;
+    return checkError(processorBufferIndex);
   }
 
   /**
@@ -107,12 +119,14 @@ class SpillWriteIndexStatus {
    * spilled.
    * 
    * @throws InterruptedException
+   * @return whether the spill started correctly
    */
-  public synchronized void startSpilling() throws InterruptedException {
+  public synchronized boolean startSpilling() throws InterruptedException {
 
-    while (!spillStart) {
+    while (!spillStart && !errorState) {
       wait();
     }
+    return !errorState;
   }
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java Wed Feb  6 14:08:26 2013
@@ -23,10 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
 import java.util.BitSet;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -35,10 +32,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import org.apache.hama.bsp.message.io.BufferReadStatus;
-import org.apache.hama.bsp.message.io.ReadIndexStatus;
-import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
-import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * <code>SpilledDataInputBuffer</code> class is designed to read from the
@@ -49,6 +44,8 @@ import org.apache.hama.bsp.message.io.Sp
  */
 public class SpilledDataInputBuffer extends DataInputStream implements
     DataInput {
+  private static final Log LOG = LogFactory
+      .getLog(SpilledDataInputBuffer.class);
 
   /**
    * The thread is used to asynchronously read from the spilled file and load
@@ -57,7 +54,7 @@ public class SpilledDataInputBuffer exte
   static class SpillReadThread implements Callable<Boolean> {
 
     private String fileName;
-    private List<ByteBuffer> bufferList_;
+    private List<SpilledByteBuffer> bufferList_;
     private long bytesToRead_;
     private long bytesWrittenInFile_;
     private SpilledDataReadStatus status_;
@@ -73,7 +70,7 @@ public class SpilledDataInputBuffer exte
      * @param status The shared object that synchronizes the indexes for buffer
      *          to fill the data with.
      */
-    public SpillReadThread(String fileName, List<ByteBuffer> bufferList,
+    public SpillReadThread(String fileName, List<SpilledByteBuffer> bufferList,
         SpilledDataReadStatus status) {
       this.fileName = fileName;
       bufferList_ = bufferList;
@@ -92,7 +89,6 @@ public class SpilledDataInputBuffer exte
       FileChannel fc = raf.getChannel();
       bytesToRead_ = fc.size();
       bytesWrittenInFile_ = bytesToRead_;
-      MappedByteBuffer mBuffer = null;
       long fileReadPos = 0;
       int fileReadIndex = -1;
       do {
@@ -105,13 +101,14 @@ public class SpilledDataInputBuffer exte
         if (fileReadIndex < 0)
           break;
 
-        ByteBuffer buffer = bufferList_.get(fileReadIndex);
+        SpilledByteBuffer buffer = bufferList_.get(fileReadIndex);
         buffer.clear();
         long readSize = Math.min(buffer.remaining(),
             (bytesWrittenInFile_ - fileReadPos));
-
-        mBuffer = fc.map(MapMode.READ_ONLY, fileReadPos, readSize);
-        buffer.put(mBuffer);
+        readSize = fc.read(buffer.getByteBuffer());
+        if (readSize < 0) {
+          break;
+        }
         buffer.flip();
         bytesToRead_ -= readSize;
         fileReadPos += readSize;
@@ -136,7 +133,13 @@ public class SpilledDataInputBuffer exte
 
     @Override
     public Boolean call() throws Exception {
-      keepReadingFromFile();
+      try {
+        keepReadingFromFile();
+      } catch (Exception e) {
+        LOG.error("Error reading from file: " + fileName, e);
+        status_.notifyError();
+        return Boolean.FALSE;
+      }
       return Boolean.TRUE;
     }
 
@@ -152,7 +155,7 @@ public class SpilledDataInputBuffer exte
   static class SpilledInputStream extends InputStream {
 
     private String fileName_;
-    private List<ByteBuffer> bufferList_;
+    private List<SpilledByteBuffer> bufferList_;
     private boolean spilledAlready_;
     ReadIndexStatus status_;
     private final byte[] readByte = new byte[1];
@@ -162,13 +165,14 @@ public class SpilledDataInputBuffer exte
     private Callable<Boolean> spillReadThread_;
     private Future<Boolean> spillReadState_;
     private ExecutorService spillThreadService_;
-    private ByteBuffer currentReadBuffer_;
+    private SpilledByteBuffer currentReadBuffer_;
     private BitSet bufferBitState_;
 
     private boolean closed_;
 
     public SpilledInputStream(String fileName, boolean direct,
-        List<ByteBuffer> bufferList, boolean hasSpilled) throws IOException {
+        List<SpilledByteBuffer> bufferList, boolean hasSpilled)
+        throws IOException {
       fileName_ = fileName;
       bufferList_ = bufferList;
       spilledAlready_ = hasSpilled;
@@ -193,7 +197,9 @@ public class SpilledDataInputBuffer exte
             (SpilledDataReadStatus) status_);
         spillThreadService_ = Executors.newFixedThreadPool(1);
         spillReadState_ = spillThreadService_.submit(spillReadThread_);
-        status_.startReading();
+        if (!status_.startReading()) {
+          throw new IOException("Failed to read the spilled file: " + fileName_);
+        }
       }
       try {
         currentReadBuffer_ = getNextBuffer();
@@ -217,7 +223,7 @@ public class SpilledDataInputBuffer exte
       }
     }
 
-    public ByteBuffer getNextBuffer() throws InterruptedException {
+    public SpilledByteBuffer getNextBuffer() throws InterruptedException {
       int index = status_.getReadBufferIndex();
       if (index >= 0 && index < bufferList_.size()) {
         return bufferList_.get(index);
@@ -362,7 +368,7 @@ public class SpilledDataInputBuffer exte
   }
 
   public static SpilledDataInputBuffer getSpilledDataInputBuffer(
-      String fileName, boolean direct, List<ByteBuffer> bufferList)
+      String fileName, boolean direct, List<SpilledByteBuffer> bufferList)
       throws IOException {
     SpilledInputStream inStream = new SpilledInputStream(fileName, direct,
         bufferList, true);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java Wed Feb  6 14:08:26 2013
@@ -17,33 +17,34 @@
  */
 package org.apache.hama.bsp.message.io;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * Base interface defining the behaviour to process the spilled data provided
- * in a byte buffer.
+ * Base interface defining the behaviour to process the spilled data provided in
+ * a byte buffer.
  */
 public interface SpilledDataProcessor {
 
   /**
    * Initialize the data processor.
+   * 
    * @param conf
    * @return true if no errors.
    */
   boolean init(Configuration conf);
-  
+
   /**
    * Override the method to define the action to be taken on the spilled data
    * provided in the byte buffer.
+   * 
    * @param buffer
    * @return true if no errors.
    */
-  boolean handleSpilledBuffer(ByteBuffer buffer);
-  
+  boolean handleSpilledBuffer(SpilledByteBuffer buffer);
+
   /**
    * Close the data processor.
+   * 
    * @return true if no errors.
    */
   boolean close();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java Wed Feb  6 14:08:26 2013
@@ -38,6 +38,7 @@ class SpilledDataReadStatus extends Read
   private volatile boolean fileReadComplete_;
   private volatile boolean bufferReadComplete_;
   private volatile BitSet bufferBitState_;
+  private volatile boolean errorState_;
 
   public SpilledDataReadStatus(int totalSize, BitSet bufferBitState) {
     readBufferIndex_ = -1;
@@ -47,8 +48,22 @@ class SpilledDataReadStatus extends Read
     bufferReadComplete_ = false;
     totalSize_ = totalSize;
     bufferBitState_ = bufferBitState;
+    errorState_ = false;
   }
 
+  private int checkError(int index) {
+    if (errorState_) {
+      return -1;
+    } else {
+      return index;
+    }
+  }
+
+  public void notifyError() {
+    errorState_ = true;
+    notify();
+  }
+  
   @Override
   public synchronized int getReadBufferIndex() throws InterruptedException {
 
@@ -59,14 +74,14 @@ class SpilledDataReadStatus extends Read
       notify();
     }
     readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_;
-    while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_) {
+    while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_ && !errorState_) {
       wait();
     }
     // The file is completely read and transferred to buffers already.
     if (bufferBitState_.isEmpty() && fileReadComplete_) {
       return -1;
     }
-    return readBufferIndex_;
+    return checkError(readBufferIndex_);
   }
 
   @Override
@@ -91,7 +106,7 @@ class SpilledDataReadStatus extends Read
     fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_;
 
     while (bufferBitState_.get(fetchFileBufferIndex_)
-        && !bufferReadComplete_) {
+        && !bufferReadComplete_ && !errorState_) {
       wait();
     }
 
@@ -99,7 +114,7 @@ class SpilledDataReadStatus extends Read
       return -1;
     }
 
-    return fetchFileBufferIndex_;
+    return checkError(fetchFileBufferIndex_);
   }
 
   @Override
@@ -121,14 +136,16 @@ class SpilledDataReadStatus extends Read
   }
 
   @Override
-  public synchronized void startReading() {
-    while (!spilledReadStart_)
+  public synchronized boolean startReading() {
+    while (!spilledReadStart_ && !errorState_) {
       try {
         wait();
       } catch (InterruptedException e) {
         LOG.error("Interrupted waiting to read the spilled file.", e);
         throw new RuntimeException(e);
       }
+    }
+    return !errorState_;
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Wed Feb  6 14:08:26 2013
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.math.BigInteger;
-import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -36,11 +35,6 @@ import java.util.concurrent.Future;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hama.bsp.message.io.SpillWriteIndexStatus;
-import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
-import org.apache.hama.bsp.message.io.SpilledDataProcessor;
-import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
-import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
 
 /**
  * <code>SpillingBuffer</code> is an output stream comprised of byte arrays that
@@ -56,7 +50,8 @@ import org.apache.hama.bsp.message.io.Wr
  */
 public class SpillingDataOutputBuffer extends DataOutputStream {
 
-  private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class);
+  private static final Log LOG = LogFactory
+      .getLog(SpillingDataOutputBuffer.class);
 
   /**
    * This thread is responsible for writing from the ByteBuffers in the list to
@@ -64,13 +59,13 @@ public class SpillingDataOutputBuffer ex
    */
   static class ProcessSpilledDataThread implements Callable<Boolean> {
     private SpillWriteIndexStatus status_;
-    private List<ByteBuffer> bufferList_;
+    private List<SpilledByteBuffer> bufferList_;
     private long fileWrittenSize_;
     private boolean closed;
     SpilledDataProcessor processor;
 
     ProcessSpilledDataThread(SpillWriteIndexStatus status,
-        List<ByteBuffer> bufferList, SpilledDataProcessor processor) {
+        List<SpilledByteBuffer> bufferList, SpilledDataProcessor processor) {
       status_ = status;
       bufferList_ = bufferList;
       closed = false;
@@ -86,6 +81,7 @@ public class SpillingDataOutputBuffer ex
     private void keepProcessingData() throws IOException {
 
       int fileWriteIndex = -1;
+
       do {
 
         try {
@@ -94,7 +90,7 @@ public class SpillingDataOutputBuffer ex
           throw new IOException(e1);
         }
         while (fileWriteIndex >= 0) {
-          ByteBuffer buffer = bufferList_.get(fileWriteIndex);
+          SpilledByteBuffer buffer = bufferList_.get(fileWriteIndex);
           processor.handleSpilledBuffer(buffer);
           buffer.clear();
           try {
@@ -107,6 +103,9 @@ public class SpillingDataOutputBuffer ex
         }
       } while (!closed);
 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Done handling spilling data.");
+      }
 
     }
 
@@ -128,7 +127,13 @@ public class SpillingDataOutputBuffer ex
 
     @Override
     public Boolean call() throws Exception {
-      keepProcessingData();
+      try {
+        keepProcessingData();
+      } catch (Exception e) {
+        LOG.error("Error handling spilled data.", e);
+        status_.notifyError();
+        return Boolean.FALSE;
+      }
       return Boolean.TRUE;
     }
 
@@ -146,12 +151,13 @@ public class SpillingDataOutputBuffer ex
     final byte[] b;
     final boolean direct_;
 
-    private List<ByteBuffer> bufferList_;
+    private List<SpilledByteBuffer> bufferList_;
     private int bufferSize_;
     private BitSet bufferState_;
     private int numberBuffers_;
-    private ByteBuffer currentBuffer_;
-    private long bytesWritten_;
+    private SpilledByteBuffer currentBuffer_;
+    protected long bytesWritten_;
+    protected long bytesWrittenToBuffer;
     private long bytesRemaining_;
     private SpillWriteIndexStatus spillStatus_;
     private int thresholdSize_;
@@ -159,8 +165,9 @@ public class SpillingDataOutputBuffer ex
     private ProcessSpilledDataThread spillThread_;
     private ExecutorService spillThreadService_;
     private Future<Boolean> spillThreadState_;
-    private boolean closed_;
+    private boolean closed_;;
 
+    private int interBufferEndOfRecord;
     private SpilledDataProcessor processor;
     /**
      * The internal buffer where data is stored.
@@ -208,8 +215,8 @@ public class SpillingDataOutputBuffer ex
 
       assert (threshold >= bufferSize);
       assert (threshold < numBuffers * bufferSize);
-      if(interBufferSize > bufferSize){
-        interBufferSize = bufferSize/2;
+      if (interBufferSize > bufferSize) {
+        interBufferSize = bufferSize / 2;
       }
       defaultBufferSize_ = interBufferSize;
       this.b = new byte[1];
@@ -218,15 +225,11 @@ public class SpillingDataOutputBuffer ex
       direct_ = direct;
       numberBuffers_ = numBuffers;
       bufferSize_ = bufferSize;
-      bufferList_ = new ArrayList<ByteBuffer>(numberBuffers_);
+      bufferList_ = new ArrayList<SpilledByteBuffer>(numberBuffers_);
       bufferState_ = new BitSet(numBuffers);
 
       for (int i = 0; i < numBuffers / 2; ++i) {
-        if (direct_) {
-          bufferList_.add(ByteBuffer.allocateDirect(bufferSize_));
-        } else {
-          bufferList_.add(ByteBuffer.allocate(bufferSize_));
-        }
+        bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_));
       }
       currentBuffer_ = bufferList_.get(0);
       bytesWritten_ = 0L;
@@ -241,13 +244,19 @@ public class SpillingDataOutputBuffer ex
       closed_ = false;
 
     }
-    
-    public void clear() throws IOException{
+
+    public void markEndOfRecord() {
+      interBufferEndOfRecord = (int) (this.bytesWrittenToBuffer + count);
+      if (currentBuffer_.capacity() > interBufferEndOfRecord)
+        this.currentBuffer_.markEndOfRecord(interBufferEndOfRecord);
+    }
+
+    public void clear() throws IOException {
       this.close();
       startedSpilling_ = false;
       bufferState_.clear();
 
-        for (ByteBuffer aBufferList_ : bufferList_) {
+        for (SpilledByteBuffer aBufferList_ : bufferList_) {
             aBufferList_.clear();
         }
       currentBuffer_ = bufferList_.get(0);
@@ -261,7 +270,7 @@ public class SpillingDataOutputBuffer ex
         buf[count++] = (byte) (b & 0xFF);
         return;
       }
-      
+
       this.b[0] = (byte) (b & 0xFF);
       write(this.b);
     }
@@ -277,49 +286,19 @@ public class SpillingDataOutputBuffer ex
      * 
      * @param len
      * @throws InterruptedException
+     * @throws IOException
      */
-    private void startSpilling() throws InterruptedException {
+    private void startSpilling() throws InterruptedException, IOException {
       synchronized (this) {
         spillThread_ = new ProcessSpilledDataThread(spillStatus_, bufferList_,
             processor);
         startedSpilling_ = true;
         spillThreadService_ = Executors.newFixedThreadPool(1);
         spillThreadState_ = spillThreadService_.submit(spillThread_);
-        spillStatus_.startSpilling();
-      }
-      // }
-    }
-
-    public void perfectFillWrite(byte[] b, int off, int len) throws IOException {
-      int rem = currentBuffer_.remaining();
-      while (len > rem) {
-        currentBuffer_.put(b, off, rem);
-        // bytesWritten_ += len;
-        // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
-        // try {
-        // startSpilling(rem);
-        // } catch (InterruptedException e) {
-        // throw new IOException("Internal error occured writing to buffer.",
-        // e);
-        // }
-        // }
-
-        currentBuffer_.flip();
-        int index = spillStatus_.getNextBufferIndex();
-        currentBuffer_ = getBuffer(index);
-        off += rem;
-        len -= rem;
-        rem = currentBuffer_.remaining();
+        if (!spillStatus_.startSpilling()) {
+          throw new IOException("Could not start spilling on disk.");
+        }
       }
-      currentBuffer_.put(b, off, len);
-      // bytesWritten_ += len;
-      // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
-      // try {
-      // startSpilling(len);
-      // } catch (InterruptedException e) {
-      // throw new IOException("Internal error occured writing to buffer.", e);
-      // }
-      // }
     }
 
     @Override
@@ -341,10 +320,11 @@ public class SpillingDataOutputBuffer ex
       count += len;
     }
 
-    private void writeInternal(byte[] b, int off, int len) throws IOException {
+    @SuppressWarnings("unused")
+    private void writeInternalImperfect(byte[] b, int off, int len)
+        throws IOException {
 
-      bytesWritten_ += len;
-      if (bytesWritten_ >= thresholdSize_ && !startedSpilling_) {
+      if (!startedSpilling_ && bytesWritten_ >= thresholdSize_) {
         try {
           startSpilling();
         } catch (InterruptedException e) {
@@ -356,10 +336,50 @@ public class SpillingDataOutputBuffer ex
         currentBuffer_.flip();
         currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
         bytesRemaining_ = bufferSize_;
+        this.bytesWrittenToBuffer = bytesWritten_;
+      }
+      currentBuffer_.put(b, off, len);
+      bytesRemaining_ -= len;
+      bytesWritten_ += len;
+
+    }
+
+    public void writeInternal(byte[] b, int off, int len) throws IOException {
+      int rem = currentBuffer_.remaining();
+      while (len > rem) {
+        currentBuffer_.put(b, off, rem);
+        bytesWritten_ += rem;
+        if (!startedSpilling_) {
+          checkSpillStart();
+        }
+        currentBuffer_.flip();
+        currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
+        if (currentBuffer_ == null)
+          throw new IOException(
+              "Error writing to spilling buffer. Could not get free buffer.");
+        bytesRemaining_ = bufferSize_;
+        this.bytesWrittenToBuffer = 0;
+        off += rem;
+        len -= rem;
+        rem = currentBuffer_.remaining();
       }
       currentBuffer_.put(b, off, len);
+      bytesWritten_ += len;
       bytesRemaining_ -= len;
+      if (!startedSpilling_) {
+        checkSpillStart();
+      }
+      this.bytesWrittenToBuffer += len;
+    }
 
+    private void checkSpillStart() throws IOException {
+      if (bytesWritten_ >= thresholdSize_) {
+        try {
+          startSpilling();
+        } catch (InterruptedException e) {
+          throw new IOException("Internal error occured writing to buffer.", e);
+        }
+      }
     }
 
     /** Flush the internal buffer */
@@ -377,16 +397,13 @@ public class SpillingDataOutputBuffer ex
      * @return
      * @throws IOException
      */
-    ByteBuffer getBuffer(int index) throws IOException {
-
+    SpilledByteBuffer getBuffer(int index) throws IOException {
+      if(index < 0){
+        return null;
+      }
       if (index >= bufferList_.size()) {
-        if (direct_) {
-          bufferList_.add(index, ByteBuffer.allocateDirect(bufferSize_));
-        } else {
-          bufferList_.add(index, ByteBuffer.allocate(bufferSize_));
-        }
+        bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_));
       }
-
       return bufferList_.get(index);
     }
 
@@ -422,7 +439,7 @@ public class SpillingDataOutputBuffer ex
           this.processor.close();
           this.spillThreadService_.shutdownNow();
         }
-        
+
       }
     }
 
@@ -432,28 +449,27 @@ public class SpillingDataOutputBuffer ex
    * Initialize the spilling buffer with spilling file name
    * 
    * @param fileName name of the file.
-   * @throws FileNotFoundException 
+   * @throws FileNotFoundException
    */
   public SpillingDataOutputBuffer(String fileName) throws FileNotFoundException {
-    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, 
+    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
         new WriteSpilledDataProcessor(fileName)));
   }
-  
-  public SpillingDataOutputBuffer(SpilledDataProcessor processor) throws FileNotFoundException {
-    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, 
-        processor));
+
+  public SpillingDataOutputBuffer(SpilledDataProcessor processor)
+      throws FileNotFoundException {
+    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, processor));
   }
-  
-  
 
   /**
    * Initializes the spilling buffer.
-   * @throws FileNotFoundException 
+   * 
+   * @throws FileNotFoundException
    */
   public SpillingDataOutputBuffer() throws FileNotFoundException {
     super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
-        new WriteSpilledDataProcessor(
-        System.getProperty("java.io.tmpdir") + File.separatorChar
+        new WriteSpilledDataProcessor(System.getProperty("java.io.tmpdir")
+            + File.separatorChar
             + new BigInteger(128, new SecureRandom()).toString(32))));
   }
 
@@ -465,32 +481,36 @@ public class SpillingDataOutputBuffer ex
    * @param direct
    * @param fileName
    */
-  public SpillingDataOutputBuffer(int bufferCount, int bufferSize, int threshold,
-      boolean direct, SpilledDataProcessor processor) {
+  public SpillingDataOutputBuffer(int bufferCount, int bufferSize,
+      int threshold, boolean direct, SpilledDataProcessor processor) {
     super(new SpillingStream(bufferCount, bufferSize, threshold, direct,
         processor));
   }
-  
-  public void clear() throws IOException{
+
+  public void clear() throws IOException {
     SpillingStream stream = (SpillingStream) this.out;
     stream.clear();
   }
-  
-  public boolean hasSpilled(){
+
+  public boolean hasSpilled() {
     return ((SpillingStream) this.out).startedSpilling_;
   }
 
+  public void markRecordEnd() {
+    ((SpillingStream) this.out).markEndOfRecord();
+  }
+
   /**
    * Provides an input stream to read from the spilling buffer.
    * 
    * @throws IOException
    */
-  public SpilledDataInputBuffer getInputStreamToRead(String fileName) throws IOException {
+  public SpilledDataInputBuffer getInputStreamToRead(String fileName)
+      throws IOException {
 
     SpillingStream stream = (SpillingStream) this.out;
     SpilledDataInputBuffer.SpilledInputStream inStream = new SpilledDataInputBuffer.SpilledInputStream(
-        fileName, stream.direct_, stream.bufferList_,
-        stream.startedSpilling_);
+        fileName, stream.direct_, stream.bufferList_, stream.startedSpilling_);
     inStream.prepareRead();
     return new SpilledDataInputBuffer(inStream);
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java Wed Feb  6 14:08:26 2013
@@ -17,10 +17,10 @@
  */
 package org.apache.hama.bsp.message.io;
 
+import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 import org.apache.commons.logging.Log;
@@ -28,8 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * 
- *
+ * A {@link SpilledDataProcessor} that writes the spilled data to the file.
  */
 public class WriteSpilledDataProcessor implements SpilledDataProcessor {
 
@@ -37,28 +36,43 @@ public class WriteSpilledDataProcessor i
       .getLog(WriteSpilledDataProcessor.class);
 
   private FileChannel fileChannel;
-  private RandomAccessFile raf;
   private String fileName;
-
+  
   public WriteSpilledDataProcessor(String fileName)
       throws FileNotFoundException {
     this.fileName = fileName;
-    raf = new RandomAccessFile(fileName, "rw");
-    fileChannel = raf.getChannel();
+  }
+
+  private void initializeFileChannel() {
+    FileOutputStream stream;
+    try {
+      stream = new FileOutputStream(new File(fileName), true);
+    } catch (FileNotFoundException e) {
+      LOG.error("Error opening file to write spilled data.", e);
+      throw new RuntimeException(e);
+    }
+    fileChannel = stream.getChannel();
   }
 
   @Override
   public boolean init(Configuration conf) {
+
     return true;
   }
 
   @Override
-  public boolean handleSpilledBuffer(ByteBuffer buffer) {
+  public boolean handleSpilledBuffer(SpilledByteBuffer buffer) {
     try {
-      fileChannel.write(buffer);
+      
+      if(fileChannel == null){
+        initializeFileChannel();
+      }
+      
+      fileChannel.write(buffer.getByteBuffer());
+      fileChannel.force(true);
       return true;
     } catch (IOException e) {
-      LOG.error("Error writing to file:"+fileName, e);
+      LOG.error("Error writing to file:" + fileName, e);
     }
     return false;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed Feb  6 14:08:26 2013
@@ -305,4 +305,9 @@ public final class DiskQueue<M extends W
         .getJobID().toString()), id.getTaskID().toString());
   }
 
+  @Override
+  public boolean isMessageSerialized() {
+    return false;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Wed Feb  6 14:08:26 2013
@@ -103,4 +103,9 @@ public final class MemoryQueue<M extends
 
   }
 
+  @Override
+  public boolean isMessageSerialized() {
+    return false;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Wed Feb  6 14:08:26 2013
@@ -79,5 +79,11 @@ public interface MessageQueue<M> extends
    * @return how many items are in the queue.
    */
   public int size();
+  
+  /**
+   * 
+   * @return true if the messages in the queue are serialized to byte buffers.
+   */
+  public boolean isMessageSerialized();
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Wed Feb  6 14:08:26 2013
@@ -105,4 +105,9 @@ public final class SortedMessageQueue<M 
 
   }
 
+  @Override
+  public boolean isMessageSerialized() {
+    return false;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Wed Feb  6 14:08:26 2013
@@ -31,16 +31,17 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
 import org.apache.hama.bsp.message.io.PreFetchCache;
 import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
 import org.apache.hama.bsp.message.io.SpilledDataProcessor;
 import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
-import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
 
 /**
  * 
- *
+ * 
  * @param <M>
  */
 public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
@@ -67,26 +68,25 @@ public class SpillingQueue<M extends Wri
   private SpilledDataInputBuffer spilledInput;
   private boolean objectWritableMode;
   private ObjectWritable objectWritable;
-  
+
   private Class<M> messageClass;
   private PreFetchCache<M> prefetchCache;
   private boolean enablePrefetch;
 
-  
   private class SpillIterator implements Iterator<M> {
 
     private boolean objectMode;
     private Class<M> classObject;
     private M messageHolder;
-    
-    public SpillIterator(boolean mode, Class<M> classObj, Configuration conf){
+
+    public SpillIterator(boolean mode, Class<M> classObj, Configuration conf) {
       this.objectMode = mode;
       this.classObject = classObj;
-      if(classObject != null){
+      if (classObject != null) {
         messageHolder = ReflectionUtils.newInstance(classObj, conf);
       }
     }
-    
+
     @Override
     public boolean hasNext() {
       return numMessagesRead != numMessagesWritten && numMessagesWritten > 0;
@@ -94,10 +94,9 @@ public class SpillingQueue<M extends Wri
 
     @Override
     public M next() {
-      if(objectMode){
+      if (objectMode) {
         return poll();
-      }
-      else {
+      } else {
         return poll(messageHolder);
       }
     }
@@ -134,6 +133,7 @@ public class SpillingQueue<M extends Wri
       } else {
         msg.write(spillOutputBuffer);
       }
+      spillOutputBuffer.markRecordEnd();
       ++numMessagesWritten;
     } catch (IOException e) {
       LOG.error("Error adding message.", e);
@@ -189,19 +189,21 @@ public class SpillingQueue<M extends Wri
   public void init(Configuration conf, TaskAttemptID arg1) {
 
     bufferCount = conf.getInt(SPILLBUFFER_COUNT, 3);
-    bufferSize = conf.getInt(SPILLBUFFER_SIZE, 16 * 1024);
+    bufferSize = conf.getInt(SPILLBUFFER_SIZE, Constants.BUFFER_DEFAULT_SIZE);
     direct = conf.getBoolean(SPILLBUFFER_DIRECT, true);
-    threshold = conf.getInt(SPILLBUFFER_THRESHOLD, 16 * 1024);
+    threshold = conf.getInt(SPILLBUFFER_THRESHOLD,
+        Constants.BUFFER_DEFAULT_SIZE);
     fileName = conf.get(SPILLBUFFER_FILENAME,
         System.getProperty("java.io.tmpdir") + File.separatorChar
             + new BigInteger(128, new SecureRandom()).toString(32));
-    
-    messageClass = (Class<M>) conf.getClass(SPILLBUFFER_MSGCLASS, null);
+
+    messageClass = (Class<M>) conf.getClass(Constants.MESSAGE_CLASS, null);
     objectWritableMode = messageClass == null;
-    
+
     SpilledDataProcessor processor;
     try {
-      processor = new WriteSpilledDataProcessor(fileName);
+      processor = new CombineSpilledDataProcessor<M>(fileName);
+      processor.init(conf);
     } catch (FileNotFoundException e) {
       LOG.error("Error initializing spilled data stream.", e);
       throw new RuntimeException(e);
@@ -212,14 +214,14 @@ public class SpillingQueue<M extends Wri
     objectWritable.setConf(conf);
     this.conf = conf;
   }
-  
-  private void incReadMsgCount(){
+
+  private void incReadMsgCount() {
     ++numMessagesRead;
   }
-  
+
   @SuppressWarnings("unchecked")
-  private M readDirect(M msg){
-    if(numMessagesRead >= numMessagesWritten){
+  private M readDirect(M msg) {
+    if (numMessagesRead >= numMessagesWritten) {
       return null;
     }
     try {
@@ -239,19 +241,18 @@ public class SpillingQueue<M extends Wri
   }
 
   public M poll(M msg) {
-    if(numMessagesRead >= numMessagesWritten){
+    if (numMessagesRead >= numMessagesWritten) {
       return null;
     }
-    if(enablePrefetch){
+    if (enablePrefetch) {
       return readFromPrefetch(msg);
-    }
-    else {
+    } else {
       return readDirect(msg);
     }
   }
-  
+
   @SuppressWarnings("unchecked")
-  private M readDirectObjectWritable(){
+  private M readDirectObjectWritable() {
     if (!objectWritableMode) {
       throw new IllegalStateException(
           "API call not supported. Set the configuration property "
@@ -266,34 +267,32 @@ public class SpillingQueue<M extends Wri
     }
     return (M) objectWritable.get();
   }
-  
+
   @SuppressWarnings({ "unchecked" })
-  private M readFromPrefetch(M msg){
-    if(objectWritableMode){
+  private M readFromPrefetch(M msg) {
+    if (objectWritableMode) {
       this.objectWritable = (ObjectWritable) prefetchCache.get();
       incReadMsgCount();
-      return (M)this.objectWritable.get();
-    }
-    else {
+      return (M) this.objectWritable.get();
+    } else {
       incReadMsgCount();
-      return (M)this.prefetchCache.get();
+      return (M) this.prefetchCache.get();
     }
-    
+
   }
 
   @Override
   public M poll() {
-    if(numMessagesRead >= numMessagesWritten){
+    if (numMessagesRead >= numMessagesWritten) {
       return null;
     }
-    
-    if(enablePrefetch){
+
+    if (enablePrefetch) {
       M msg = readFromPrefetch(null);
-      if(msg != null)
+      if (msg != null)
         incReadMsgCount();
       return msg;
-    }
-    else {
+    } else {
       return readDirectObjectWritable();
     }
   }
@@ -312,7 +311,7 @@ public class SpillingQueue<M extends Wri
       LOG.error("Error initializing the input spilled stream", e);
       throw new RuntimeException(e);
     }
-    if(conf.getBoolean(ENABLE_PREFETCH, false)){
+    if (conf.getBoolean(ENABLE_PREFETCH, false)) {
       this.prefetchCache = new PreFetchCache<M>(numMessagesWritten);
       this.enablePrefetch = true;
       try {
@@ -320,6 +319,8 @@ public class SpillingQueue<M extends Wri
       } catch (InterruptedException e) {
         LOG.error("Error starting prefetch on message queue.", e);
         throw new RuntimeException(e);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
     }
   }
@@ -333,5 +334,12 @@ public class SpillingQueue<M extends Wri
   public int size() {
     return numMessagesWritten;
   }
+  
+
+  @Override
+  public boolean isMessageSerialized() {
+    return true;
+  }
+
 
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java Wed Feb  6 14:08:26 2013
@@ -19,16 +19,34 @@ package org.apache.hama.bsp.message;
 
 import java.io.EOFException;
 import java.io.File;
+import java.io.RandomAccessFile;
 import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.security.SecureRandom;
+import java.util.Iterator;
 
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
+import org.apache.hama.bsp.message.io.DirectByteBufferInputStream;
+import org.apache.hama.bsp.message.io.DirectByteBufferOutputStream;
+import org.apache.hama.bsp.message.io.ReusableByteBuffer;
+import org.apache.hama.bsp.message.io.SpilledByteBuffer;
 import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
 import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.SyncFlushByteBufferOutputStream;
+import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
 import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
 
-import junit.framework.TestCase;
-
 public class TestMessageIO extends TestCase {
 
   public void testNonSpillBuffer() throws Exception {
@@ -39,23 +57,22 @@ public class TestMessageIO extends TestC
     for (int i = 0; i < 100; ++i) {
       text.write(outputBuffer);
     }
-
     assertTrue(outputBuffer != null);
     assertTrue(outputBuffer.size() == 4000);
     assertFalse(outputBuffer.hasSpilled());
-
     outputBuffer.close();
-
   }
 
   public void testSpillBuffer() throws Exception {
 
+    Configuration conf = new HamaConfiguration();
     String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
         + new BigInteger(128, new SecureRandom()).toString(32);
+    SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
+    processor.init(conf);
     SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
-        1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+        1024, 1024, true, processor);
     Text text = new Text("Testing the spillage of spilling buffer");
-
     for (int i = 0; i < 100; ++i) {
       text.write(outputBuffer);
     }
@@ -70,48 +87,398 @@ public class TestMessageIO extends TestC
 
   }
 
-  public void testSpillInputStream() throws Exception {
-    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
-        + new BigInteger(128, new SecureRandom()).toString(32);
-    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
-        1024, 1024, true, new WriteSpilledDataProcessor(fileName));
-    Text text = new Text("Testing the spillage of spilling buffer");
+  public static class SumCombiner extends Combiner<IntWritable> {
 
+    @Override
+    public IntWritable combine(Iterable<IntWritable> messages) {
+      int sum = 0;
+      for (IntWritable intObj : messages) {
+        sum += intObj.get();
+      }
+      return new IntWritable(sum);
+    }
+
+  }
+
+  public void testSpillingByteBuffer() throws Exception {
+    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+    SpilledByteBuffer spillBuffer = new SpilledByteBuffer(buffer);
     for (int i = 0; i < 100; ++i) {
-      text.write(outputBuffer);
+      spillBuffer.putInt(i);
+      spillBuffer.markEndOfRecord();
     }
+    spillBuffer.putInt(100);
+    assertEquals(spillBuffer.getMarkofLastRecord(), 400);
+    assertEquals(spillBuffer.remaining(), (512 - 404));
+    spillBuffer.flip();
+    assertEquals(spillBuffer.remaining(), 404);
+    assertEquals(spillBuffer.getMarkofLastRecord(), 400);
 
-    assertTrue(outputBuffer != null);
-    assertTrue(outputBuffer.size() == 4000);
-    assertTrue(outputBuffer.hasSpilled());
-    File f = new File(fileName);
-    assertTrue(f.exists());
-    outputBuffer.close();
-    assertTrue(f.length() == 4000L);
+  }
 
-    SpilledDataInputBuffer inputBuffer = outputBuffer
-        .getInputStreamToRead(fileName);
+  public void testDirectByteBufferOutput() throws Exception {
+
+    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
+    IntWritable intWritable = new IntWritable(1);
 
     for (int i = 0; i < 100; ++i) {
-      text.readFields(inputBuffer);
-      assertTrue("Testing the spillage of spilling buffer".equals(text
-          .toString()));
-      text.clear();
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+
+    stream.close();
+
+    buffer.flip();
+    for (int i = 0; i < 100; ++i) {
+      assertEquals(i, buffer.getInt());
     }
 
     try {
-      text.readFields(inputBuffer);
+      buffer.getInt();
       assertTrue(false);
-    } catch (EOFException eof) {
+    } catch (Exception e) {
       assertTrue(true);
     }
 
-    inputBuffer.close();
-    inputBuffer.completeReading(false);
-    assertTrue(f.exists());
-    inputBuffer.completeReading(true);
-    assertFalse(f.exists());
+  }
+
+  public void testDirectByteBufferInput() throws Exception {
+    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
+    IntWritable intWritable = new IntWritable(1);
+
+    for (int i = 0; i < 100; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+    intWritable.write(stream);
+
+    stream.close();
+
+    buffer.flip();
+
+    DirectByteBufferInputStream inStream = new DirectByteBufferInputStream();
+
+    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+    for (int i = 0; i < 100; ++i) {
+      intWritable.readFields(inStream);
+      assertEquals(i, intWritable.get());
+    }
+
+    assertFalse(inStream.hasDataToRead());
+    assertTrue(inStream.hasUnmarkData());
+    inStream.prepareForNext();
+
+    // push in another buffer and check if the unmarked data could be read.
+
+    buffer.clear();
+    stream = new DirectByteBufferOutputStream();
+    buffer = ByteBuffer.allocateDirect(2048);
+    stream.setBuffer(buffer);
+
+    for (int i = 0; i < 400; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+    stream.close();
+    buffer.flip();
+
+    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+
+    // Read previous data
+    intWritable.readFields(inStream);
+    assertEquals(99, intWritable.get());
+
+    for (int i = 0; i < 100; ++i) {
+      intWritable.readFields(inStream);
+      assertEquals(i, intWritable.get());
+    }
+
+    assertFalse(inStream.hasDataToRead());
+    assertTrue(inStream.hasUnmarkData());
+    inStream.prepareForNext();
+
+    buffer.clear();
+    stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
 
+    for (int i = 0; i < 100; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+    stream.close();
+    buffer.flip();
+
+    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+
+    // Read previous data with resized intermediate buffer
+    for (int i = 100; i < 400; ++i) {
+      intWritable.readFields(inStream);
+      assertEquals(i, intWritable.get());
+    }
+
+    for (int i = 0; i < 100; ++i) {
+      intWritable.readFields(inStream);
+      assertEquals(i, intWritable.get());
+    }
+
+    assertFalse(inStream.hasDataToRead());
+    assertFalse(inStream.hasUnmarkData());
+
+  }
+
+  /**
+   * 
+   * @throws Exception
+   */
+  public void testReusableByteBufferIter() throws Exception {
+
+    ReusableByteBuffer<IntWritable> reuseByteBuffer = new ReusableByteBuffer<IntWritable>(
+        new IntWritable());
+
+    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
+    IntWritable intWritable = new IntWritable(1);
+
+    for (int i = 0; i < 100; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+    intWritable.write(stream);
+    stream.close();
+    buffer.flip();
+    reuseByteBuffer.set(new SpilledByteBuffer(buffer, 400));
+
+    Iterator<IntWritable> iter = reuseByteBuffer.iterator();
+    int j = 0;
+    while (iter.hasNext()) {
+      assertEquals(iter.next().get(), j++);
+    }
+    assertEquals(j, 100);
+    reuseByteBuffer.prepareForNext();
+
+    buffer.clear();
+
+    stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
+
+    for (int i = 0; i < 101; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+    }
+    stream.close();
+    buffer.flip();
+
+    reuseByteBuffer.set(new SpilledByteBuffer(buffer, 404));
+    iter = reuseByteBuffer.iterator();
+    assertEquals(iter.next().get(), 99);
+
+    j = 0;
+    while (iter.hasNext()) {
+      assertEquals(iter.next().get(), j++);
+    }
+    buffer.clear();
+  }
+
+  public void testCombineProcessor() throws Exception {
+    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+
+    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+    stream.setBuffer(buffer);
+    IntWritable intWritable = new IntWritable(1);
+    int sum = 0;
+    for (int i = 0; i < 100; ++i) {
+      intWritable.set(i);
+      intWritable.write(stream);
+      sum += i;
+    }
+    intWritable.write(stream);
+    stream.close();
+    buffer.flip();
+
+    Configuration conf = new HamaConfiguration();
+
+    conf.setClass(Constants.MESSAGE_CLASS, IntWritable.class, Writable.class);
+    conf.setClass(Constants.COMBINER_CLASS, SumCombiner.class, Combiner.class);
+
+    CombineSpilledDataProcessor<IntWritable> processor = new CombineSpilledDataProcessor<IntWritable>(
+        fileName);
+    assertTrue(processor.init(conf));
+    File f = new File(fileName);
+    try {
+      assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
+          400)));
+      buffer.flip();
+      assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
+          400)));
+      assertTrue(processor.close());
+
+      assertTrue(f.exists());
+      assertEquals(f.length(), 8);
+
+      RandomAccessFile raf = new RandomAccessFile(fileName, "r");
+      FileChannel fileChannel = raf.getChannel();
+      ByteBuffer readBuff = ByteBuffer.allocateDirect(16);
+      fileChannel.read(readBuff);
+      readBuff.flip();
+      assertEquals(readBuff.getInt(), sum);
+      assertEquals(readBuff.getInt(), sum + 99);
+    } finally {
+      assertTrue(f.delete());
+    }
+
+  }
+
+  public void testSpillInputStream() throws Exception {
+
+    File f = null;
+    try {
+      String fileName = System.getProperty("java.io.tmpdir")
+          + File.separatorChar + "testSpillInputStream.txt";
+      Configuration conf = new HamaConfiguration();
+      SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
+      processor.init(conf);
+      SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+          1024, 1024, true, processor);
+      Text text = new Text("Testing the spillage of spilling buffer");
+      for (int i = 0; i < 100; ++i) {
+        text.write(outputBuffer);
+        outputBuffer.markRecordEnd();
+      }
+
+      assertTrue(outputBuffer != null);
+      assertTrue(outputBuffer.size() == 4000);
+      assertTrue(outputBuffer.hasSpilled());
+      f = new File(fileName);
+      assertTrue(f.exists());
+      outputBuffer.close();
+      assertTrue(f.length() == 4000);// + (4000 / 1024 + 1) * 4));
+
+      SpilledDataInputBuffer inputBuffer = outputBuffer
+          .getInputStreamToRead(fileName);
+
+      for (int i = 0; i < 100; ++i) {
+        text.readFields(inputBuffer);
+        assertTrue("Testing the spillage of spilling buffer".equals(text
+            .toString()));
+        text.clear();
+      }
+
+      try {
+        text.readFields(inputBuffer);
+        assertTrue(false);
+      } catch (EOFException eof) {
+        assertTrue(true);
+      }
+
+      inputBuffer.close();
+      inputBuffer.completeReading(false);
+      assertTrue(f.exists());
+      inputBuffer.completeReading(true);
+      assertFalse(f.exists());
+    } finally {
+      if (f != null) {
+        if (f.exists()) {
+          f.delete();
+        }
+      }
+    }
+
+  }
+
+  public void testSyncFlushByteBufferOutputStream() throws Exception {
+
+    File f = null;
+    try {
+      String fileName = System.getProperty("java.io.tmpdir")
+          + File.separatorChar + "testSyncFlushByteBufferOutputStream.txt";
+      SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
+          fileName);
+      DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
+          stream);
+      ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+      syncFlushStream.setBuffer(buffer);
+      IntWritable intWritable = new IntWritable(1);
+
+      for (int i = 0; i < 200; ++i) {
+        intWritable.set(i);
+        intWritable.write(syncFlushStream);
+      }
+      intWritable.write(syncFlushStream);
+      syncFlushStream.close();
+
+      f = new File(fileName);
+      assertTrue(f.exists());
+      assertTrue(f.length() == 804);
+      assertTrue(f.delete());
+    } finally {
+      if (f != null) {
+        f.delete();
+      }
+    }
+
+  }
+
+  public void testSyncFlushBufferInputStream() throws Exception {
+    File f = null;
+    try {
+      String fileName = System.getProperty("java.io.tmpdir")
+          + File.separatorChar + "testSyncFlushBufferInputStream.txt";
+      SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
+          fileName);
+      DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
+          stream);
+      ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+      syncFlushStream.setBuffer(buffer);
+      IntWritable intWritable = new IntWritable(1);
+
+      for (int i = 0; i < 200; ++i) {
+        intWritable.set(i);
+        intWritable.write(syncFlushStream);
+      }
+      intWritable.write(syncFlushStream);
+      syncFlushStream.close();
+
+      f = new File(fileName);
+      assertTrue(f.exists());
+      assertEquals(f.length(), 804);
+
+      SyncReadByteBufferInputStream syncReadStream = new SyncReadByteBufferInputStream(
+          stream.isSpilled(), fileName);
+      DirectByteBufferInputStream inStream = new DirectByteBufferInputStream(
+          syncReadStream);
+      buffer.clear();
+      inStream.setBuffer(buffer);
+
+      for (int i = 0; i < 200; ++i) {
+        intWritable.readFields(inStream);
+        assertEquals(intWritable.get(), i);
+      }
+
+      intWritable.readFields(inStream);
+      assertEquals(intWritable.get(), 199);
+
+      try {
+        intWritable.readFields(inStream);
+        assertFalse(true);
+      } catch (Exception e) {
+        assertTrue(true);
+      }
+
+      inStream.close();
+      syncFlushStream.close();
+
+    } finally {
+      if (f != null) {
+        f.delete();
+      }
+    }
   }
 
 }