You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/01/13 21:52:04 UTC

svn commit: r1432734 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/message/io/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/hama/bsp/message/

Author: surajsmenon
Date: Sun Jan 13 20:52:03 2013
New Revision: 1432734

URL: http://svn.apache.org/viewvc?rev=1432734&view=rev
Log:
[HAMA-559] Added spilling queue.

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432734&r1=1432733&r2=1432734&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun Jan 13 20:52:03 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-559: Added spilling queue. (surajsmenon)
    HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)
    HAMA-524: Add SpMV example (Mikalai Parafeniuk via edwardyoon)
    HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1432734&r1=1432733&r2=1432734&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Sun Jan 13 20:52:03 2013
@@ -68,6 +68,8 @@ public interface Constants {
   public static final String COMBINER_CLASS = "bsp.combiner.class";
 
   public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+  
+  public static final String COMBINER_CLASS = "bsp.combiner.class";
 
   ////////////////////////////////////////
   // Task scheduler related constants

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+/**
+ * The implementation of shared object when there is no spilling in the
+ * first place.
+ * 
+ */
+class BufferReadStatus extends ReadIndexStatus {
+
+  private int index;
+  private int count;
+
+  public BufferReadStatus(int bufferCount) {
+    index = -1;
+    count = bufferCount;
+  }
+
+  @Override
+  public int getReadBufferIndex() {
+    if(count == index - 1){
+      return -1;
+    }
+    return ++index;
+  }
+
+  @Override
+  public int getFileBufferIndex() {
+    return -1;
+  }
+
+  @Override
+  public void completeReading() {
+  }
+
+  @Override
+  public void startReading() {
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.message.io.PreFetchCache;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+
+public class PreFetchCache<M extends Writable> {
+  private static final Log LOG = LogFactory.getLog(PreFetchCache.class);
+  private Object[] objectListArr;
+  private long totalMessages;
+  private int capacity;
+  private int arrIndex;
+  private int listIndex;
+  private PreFetchThread<M> preFetchThread;
+  private SpilledDataReadStatus status;
+  private BitSet bufferBitSet;
+
+  private static class PreFetchThread<M extends Writable> extends Thread
+      implements Runnable {
+
+    private volatile boolean stopReading;
+
+    SpilledDataInputBuffer stream;
+    Class<M> objectClass;
+    private Object[] listArr;
+    SpilledDataReadStatus status;
+    private long totalMsgs;
+    private int listCapacity;
+    private int messageCount;
+    private Configuration conf;
+
+    private void fill(int index) throws IOException {
+      @SuppressWarnings("unchecked")
+      List<? super Writable> list = (List<? super Writable>) listArr[index];
+
+      for (int i = 0; i < listCapacity && messageCount < totalMsgs
+          && !stopReading; ++i) {
+
+        if (i == list.size()) {
+          if (objectClass == null) {
+            ObjectWritable writable = new ObjectWritable();
+            writable.readFields(stream);
+            list.add(i, writable);
+          } else {
+            M obj = ReflectionUtils.newInstance(objectClass, conf);
+            obj.readFields(stream);
+            list.add(i, obj);
+          }
+        } else {
+          Writable obj = (Writable) list.get(i);
+          obj.readFields(stream);
+        }
+        ++messageCount;
+      }
+
+    }
+
+    public PreFetchThread(Class<M> classObj, Object[] objectListArr,
+        int capacity, SpilledDataInputBuffer inStream, long totalMessages,
+        SpilledDataReadStatus indexStatus, Configuration conf) {
+      objectClass = classObj;
+      listArr = objectListArr;
+      status = indexStatus;
+      this.conf = conf;
+      totalMsgs = totalMessages;
+      messageCount = 0;
+      this.stream = inStream;
+      listCapacity = capacity;
+    }
+
+    @Override
+    public void run() {
+      int index = 0;
+      try {
+        while ((index = status.getFileBufferIndex()) >= 0 && !stopReading) {
+          fill(index);
+          if (stopReading || messageCount == totalMsgs) {
+            status.closedBySpiller();
+            break;
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted reading pre-fetch buffer index.", e);
+      } catch (IOException e) {
+        LOG.error("Error reading pre-fetch buffer index.", e);
+      }
+      status.completeReading();
+    }
+
+    public synchronized void stopReading() {
+      stopReading = true;
+    }
+
+  }
+
+  public PreFetchCache(long totalMessages) {
+    this(2, totalMessages, 64);
+  }
+
+  public PreFetchCache(int numBuffers, long totalMessages) {
+    this(numBuffers, totalMessages, 64);
+  }
+
+  public PreFetchCache(int numBuffers, long totalMessages, int capacity) {
+    this.objectListArr = new Object[numBuffers];
+    this.totalMessages = totalMessages;
+    this.bufferBitSet = new BitSet();
+    status = new SpilledDataReadStatus(numBuffers, bufferBitSet);
+    for (int i = 0; i < numBuffers; ++i) {
+      this.objectListArr[i] = new ArrayList<M>(capacity);
+    }
+    this.capacity = capacity;
+  }
+
+  public void startFetching(Class<M> classObject,
+      SpilledDataInputBuffer buffer, Configuration conf)
+      throws InterruptedException {
+
+    preFetchThread = new PreFetchThread<M>(classObject, objectListArr,
+        capacity, buffer, totalMessages, status, conf);
+    preFetchThread.start();
+    status.startReading();
+    arrIndex = status.getReadBufferIndex();
+  }
+
+  @SuppressWarnings("unchecked")
+  public Writable get() {
+    if (listIndex == capacity) {
+      try {
+        arrIndex = status.getReadBufferIndex();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted getting prefetched records.", e);
+        return null;
+      }
+      if (arrIndex < 0) {
+        return null;
+      }
+      listIndex = 0;
+    }
+    return ((List<M>) (this.objectListArr[arrIndex])).get(listIndex++);
+  }
+
+  public void close() {
+    status.completeReading();
+    this.preFetchThread.stopReading();
+    try {
+      this.preFetchThread.join();
+    } catch (InterruptedException e) {
+      LOG.error("Prefetch thread was interrupted.", e);
+    }
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+/**
+ * The base class that defines the shared object that synchronizes the
+ * indexes for the byte array such that there is no loss of data.
+ * 
+ */
+abstract class ReadIndexStatus {
+
+  /**
+   * Returns index of the byte array that is ready for the data to be read
+   * from. The implementation may or may not block depending on the spilling
+   * situation.
+   * 
+   * @return the index.
+   * @throws InterruptedException
+   */
+  public abstract int getReadBufferIndex() throws InterruptedException;
+
+  /**
+   * Returns the index of the byte array that could used to load the data
+   * from the spilled file. The implementation may or may not block
+   * depending on the spilling situation.
+   * 
+   * @return
+   * @throws InterruptedException
+   */
+  public abstract int getFileBufferIndex() throws InterruptedException;
+
+  /**
+   * Indicate that the operation is complete.
+   */
+  public abstract void completeReading();
+
+  /**
+   * Indicate to start reading.
+   */
+  public abstract void startReading();
+
+}
+

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * This class provides a thread-safe interface for both the spilling thread and
+ * the thread that is writing into <code>SpillingBuffer</code>. It stores the
+ * state and manipulates the next available buffer for both the threads.
+ */
+class SpillWriteIndexStatus {
+
+  private volatile int bufferListWriteIndex;
+  private volatile int processorBufferIndex;
+  private volatile boolean spillComplete;
+  private volatile boolean spillStart;
+  private int numBuffers;
+  private volatile BitSet bufferBitState;
+
+  SpillWriteIndexStatus(int size, int bufferCount, int bufferIndex,
+      int fileWriteIndex, BitSet bufferBitState) {
+    spillComplete = false;
+    spillStart = false;
+    bufferListWriteIndex = bufferIndex;
+    processorBufferIndex = fileWriteIndex;
+    numBuffers = bufferCount;
+    this.bufferBitState = bufferBitState;
+  }
+
+  /**
+   * Returns the index of next available buffer from the list. The call blocks
+   * until it is available and indicated by the spilling thread.
+   * 
+   * @return the index of the next available buffer
+   * @throws IOException when the thread is interrupted while blocked on the
+   *           availability of the next buffer.
+   */
+  public synchronized int getNextBufferIndex() throws IOException {
+    assert !spillComplete;
+    bufferBitState.set(bufferListWriteIndex, true);
+    notify();
+    bufferListWriteIndex = (bufferListWriteIndex + 1) % numBuffers;
+    while (bufferBitState.get(bufferListWriteIndex)) {
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+    }
+    return bufferListWriteIndex;
+  }
+
+  /**
+   * Returns the index of the next buffer from the list that has information
+   * written to be spilled to disk. The call blocks until a buffer is available
+   * and indicated by the data writing thread.
+   * 
+   * @return the index of the next to be spilled buffer.
+   * @throws InterruptedException when the thread is interrupted while blocked
+   *           on the availability of the next buffer.
+   */
+  public synchronized int getNextProcessorBufferIndex()
+      throws InterruptedException {
+
+    // Indicate to main thread that the spilling thread has started.
+    if (!spillStart) {
+      spillStart = true;
+      notify();
+    }
+    if (processorBufferIndex >= 0) {
+      assert bufferBitState.get(processorBufferIndex);
+      bufferBitState.set(processorBufferIndex, false);
+      notify();
+    }
+    processorBufferIndex = (processorBufferIndex + 1) % numBuffers;
+    while (!bufferBitState.get(processorBufferIndex) && !spillComplete) {
+      wait();
+    }
+    // Is the last buffer written to file after the spilling is complete?
+    // then complete the operation.
+    if (spillComplete && bufferBitState.isEmpty()) {
+      return -1;
+    }
+    return processorBufferIndex;
+  }
+
+  /**
+   * This call indicates that the spilling has started. It is blocked until the
+   * spilling thread makes its first call to get the buffer with data to be
+   * spilled.
+   * 
+   * @throws InterruptedException
+   */
+  public synchronized void startSpilling() throws InterruptedException {
+
+    while (!spillStart) {
+      wait();
+    }
+  }
+
+  /**
+   * This call informs the closing of the buffer and thereby indicating the
+   * spilling thread to complete.
+   */
+  public synchronized void spillCompleted() {
+    assert !spillComplete;
+    spillComplete = true;
+    bufferBitState.set(bufferListWriteIndex, true);
+    notify();
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hama.bsp.message.io.BufferReadStatus;
+import org.apache.hama.bsp.message.io.ReadIndexStatus;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+
+/**
+ * <code>SpilledDataInputBuffer</code> class is designed to read from the
+ * spilling buffer. Depending on whether the records were spilled or not, the
+ * stream provides data from a list of byte arrays or from file. The contents of
+ * the file are asynchronously loaded in the byte arrays as the values are read.
+ * 
+ */
+public class SpilledDataInputBuffer extends DataInputStream implements
+    DataInput {
+
+  /**
+   * The thread is used to asynchronously read from the spilled file and load
+   * the buffer for the user to read from byte arrays in heap.
+   */
+  static class SpillReadThread implements Callable<Boolean> {
+
+    private String fileName;
+    private List<ByteBuffer> bufferList_;
+    private long bytesToRead_;
+    private long bytesWrittenInFile_;
+    private SpilledDataReadStatus status_;
+    private boolean closed_;
+
+    /**
+     * Creates the thread to read the contents of the file and loads into the
+     * list of byte arrays.
+     * 
+     * @param fileName Name of the file
+     * @param bufferList list of byte arrays.
+     * @param bytesInFile Total bytes in the file.
+     * @param status The shared object that synchronizes the indexes for buffer
+     *          to fill the data with.
+     */
+    public SpillReadThread(String fileName, List<ByteBuffer> bufferList,
+        SpilledDataReadStatus status) {
+      this.fileName = fileName;
+      bufferList_ = bufferList;
+      status_ = status;
+      closed_ = false;
+    }
+
+    /**
+     * Keeps reading from file and loads the next available byte array with the
+     * data from the file.
+     * 
+     * @throws IOException
+     */
+    private void keepReadingFromFile() throws IOException {
+      RandomAccessFile raf = new RandomAccessFile(fileName, "r");
+      FileChannel fc = raf.getChannel();
+      bytesToRead_ = fc.size();
+      bytesWrittenInFile_ = bytesToRead_;
+      MappedByteBuffer mBuffer = null;
+      long fileReadPos = 0;
+      int fileReadIndex = -1;
+      do {
+        try {
+          fileReadIndex = status_.getFileBufferIndex();
+        } catch (InterruptedException e1) {
+          throw new IOException(e1);
+        }
+
+        if (fileReadIndex < 0)
+          break;
+
+        ByteBuffer buffer = bufferList_.get(fileReadIndex);
+        buffer.clear();
+        long readSize = Math.min(buffer.remaining(),
+            (bytesWrittenInFile_ - fileReadPos));
+
+        mBuffer = fc.map(MapMode.READ_ONLY, fileReadPos, readSize);
+        buffer.put(mBuffer);
+        buffer.flip();
+        bytesToRead_ -= readSize;
+        fileReadPos += readSize;
+
+      } while (!closed_ && bytesToRead_ > 0 && fileReadIndex >= 0
+          && fileReadIndex < bufferList_.size());
+      fc.close();
+      closed_ = true;
+      status_.closedBySpiller();
+    }
+
+    /*
+     * Indicate the thread to close.
+     */
+    public void completeRead() {
+      closed_ = true;
+    }
+
+    public boolean isClosed() {
+      return closed_;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      keepReadingFromFile();
+      return Boolean.TRUE;
+    }
+
+  }
+
+  /**
+   * The input stream that encapsulates a previously written spilled buffer that
+   * has a list of byte arrays and a file to read from in the case where
+   * spilling is already done. If spilling has not happened, all the data is
+   * present in the list of byte arrays.
+   * 
+   */
+  static class SpilledInputStream extends InputStream {
+
+    private String fileName_;
+    private List<ByteBuffer> bufferList_;
+    private boolean spilledAlready_;
+    ReadIndexStatus status_;
+    private final byte[] readByte = new byte[1];
+    private int count;
+    private final byte[] buf;
+    private int pos;
+    private Callable<Boolean> spillReadThread_;
+    private Future<Boolean> spillReadState_;
+    private ExecutorService spillThreadService_;
+    private ByteBuffer currentReadBuffer_;
+    private BitSet bufferBitState_;
+
+    private boolean closed_;
+
+    public SpilledInputStream(String fileName, boolean direct,
+        List<ByteBuffer> bufferList, boolean hasSpilled) throws IOException {
+      fileName_ = fileName;
+      bufferList_ = bufferList;
+      spilledAlready_ = hasSpilled;
+      bufferBitState_ = new BitSet(bufferList.size());
+
+      if (spilledAlready_) {
+        status_ = new SpilledDataReadStatus(bufferList.size(), bufferBitState_);
+      } else {
+        status_ = new BufferReadStatus(bufferList.size());
+      }
+
+      buf = new byte[8192];
+      count = 0;
+      pos = 0;
+      closed_ = false;
+
+    }
+
+    public void prepareRead() throws IOException {
+      if (spilledAlready_) {
+        spillReadThread_ = new SpillReadThread(fileName_, bufferList_,
+            (SpilledDataReadStatus) status_);
+        spillThreadService_ = Executors.newFixedThreadPool(1);
+        spillReadState_ = spillThreadService_.submit(spillReadThread_);
+        status_.startReading();
+      }
+      try {
+        currentReadBuffer_ = getNextBuffer();
+      } catch (InterruptedException e1) {
+        throw new IOException(e1);
+      }
+
+      if (currentReadBuffer_ == null) {
+        if (spilledAlready_) {
+          try {
+            spillReadState_.get();
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          } catch (ExecutionException e) {
+            throw new IOException(e);
+          } finally {
+            spillThreadService_.shutdownNow();
+          }
+        }
+        throw new IOException("Could not initialize the buffer for reading");
+      }
+    }
+
+    public ByteBuffer getNextBuffer() throws InterruptedException {
+      int index = status_.getReadBufferIndex();
+      if (index >= 0 && index < bufferList_.size()) {
+        return bufferList_.get(index);
+      }
+      return null;
+
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (count > 0) {
+        --count;
+        return buf[pos++] & 0xFF;
+      }
+
+      if (-1 == read(readByte, 0, 1)) {
+        return -1;
+      }
+      return readByte[0] & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+
+      if (count >= len) {
+        // copy the count of bytes to b
+        System.arraycopy(buf, pos, b, off, len);
+        count -= len;
+        pos += len;
+        return len;
+      }
+      int size = 0;
+
+      while (len > 0) {
+        if (count == 0) {
+          count = readInternal(buf, 0, buf.length);
+          if (count == -1) {
+            return count;
+          }
+          pos = 0;
+        }
+        int readSize = Math.min(count, len);
+        System.arraycopy(buf, pos, b, off, readSize);
+        len -= readSize;
+        off += readSize;
+        size += readSize;
+        count -= readSize;
+        pos += readSize;
+      }
+      return size;
+
+    }
+
+    public int readInternal(byte[] b, int off, int len) throws IOException {
+      if (currentReadBuffer_ == null) {
+        return -1;
+      }
+      int cur = 0;
+      while (len > 0) {
+        int rem = currentReadBuffer_.remaining();
+        if (rem == 0) {
+          try {
+            currentReadBuffer_ = getNextBuffer();
+          } catch (InterruptedException e) {
+            throw new IOException(e);
+          }
+          if (currentReadBuffer_ == null) {
+            return cur;
+          }
+          rem = currentReadBuffer_.remaining();
+        }
+        int readSize = Math.min(rem, len);
+        currentReadBuffer_.get(b, off, readSize);
+        len -= readSize;
+        off += rem;
+        cur += readSize;
+      }
+
+      return cur;
+    }
+
+    public void clear() throws IOException {
+      close();
+      bufferBitState_.clear();
+
+      if (spilledAlready_) {
+        status_ = new SpilledDataReadStatus(bufferList_.size(), bufferBitState_);
+      } else {
+        status_ = new BufferReadStatus(bufferList_.size());
+      }
+
+      count = 0;
+      pos = 0;
+      closed_ = false;
+      prepareRead();
+    }
+
+    @Override
+    public void close() throws IOException {
+
+      if (closed_)
+        return;
+
+      status_.completeReading();
+      if (this.spilledAlready_) {
+        try {
+          this.spillReadState_.get();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } catch (ExecutionException e) {
+          throw new IOException(e);
+        } finally {
+          spillThreadService_.shutdownNow();
+        }
+      }
+
+    }
+
+    public String getFileName() {
+      return fileName_;
+    }
+
+  }
+
+  public void completeReading(boolean deleteFile) throws IOException {
+    in.close();
+    if (deleteFile) {
+      File file = new File(
+          ((SpilledDataInputBuffer.SpilledInputStream) in).getFileName());
+      if (file.exists())
+        file.delete();
+    }
+  }
+
+  public SpilledDataInputBuffer(InputStream in) {
+    super(in);
+  }
+
+  public void clear() throws IOException {
+    SpilledInputStream inStream = (SpilledInputStream) this.in;
+    inStream.clear();
+  }
+
+  public static SpilledDataInputBuffer getSpilledDataInputBuffer(
+      String fileName, boolean direct, List<ByteBuffer> bufferList)
+      throws IOException {
+    SpilledInputStream inStream = new SpilledInputStream(fileName, direct,
+        bufferList, true);
+    inStream.prepareRead();
+    return new SpilledDataInputBuffer(inStream);
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Base interface defining the behaviour to process the spilled data provided
+ * in a byte buffer.
+ */
+public interface SpilledDataProcessor {
+
+  /**
+   * Initialize the data processor.
+   * @param conf
+   * @return true if no errors.
+   */
+  boolean init(Configuration conf);
+  
+  /**
+   * Override the method to define the action to be taken on the spilled data
+   * provided in the byte buffer.
+   * @param buffer
+   * @return true if no errors.
+   */
+  boolean handleSpilledBuffer(ByteBuffer buffer);
+  
+  /**
+   * Close the data processor.
+   * @return true if no errors.
+   */
+  boolean close();
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The implementation of the shared object when the buffer has already
+ * spilled to disk.
+ * 
+ */
+class SpilledDataReadStatus extends ReadIndexStatus {
+
+  private static final Log LOG = LogFactory.getLog(SpilledDataReadStatus.class);
+  
+  private volatile int readBufferIndex_;
+  private volatile int fetchFileBufferIndex_;
+  private int totalSize_;
+  private volatile boolean spilledReadStart_;
+  private volatile boolean fileReadComplete_;
+  private volatile boolean bufferReadComplete_;
+  private volatile BitSet bufferBitState_;
+
+  public SpilledDataReadStatus(int totalSize, BitSet bufferBitState) {
+    readBufferIndex_ = -1;
+    fetchFileBufferIndex_ = 0;
+    spilledReadStart_ = false;
+    fileReadComplete_ = false;
+    bufferReadComplete_ = false;
+    totalSize_ = totalSize;
+    bufferBitState_ = bufferBitState;
+  }
+
+  @Override
+  public synchronized int getReadBufferIndex() throws InterruptedException {
+
+    assert !bufferReadComplete_;
+
+    if (readBufferIndex_ >= 0) {
+      bufferBitState_.set(readBufferIndex_, false);
+      notify();
+    }
+    readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_;
+    while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_) {
+      wait();
+    }
+    // The file is completely read and transferred to buffers already.
+    if (bufferBitState_.isEmpty() && fileReadComplete_) {
+      return -1;
+    }
+    return readBufferIndex_;
+  }
+
+  @Override
+  public synchronized int getFileBufferIndex() throws InterruptedException {
+
+    assert !fileReadComplete_;
+
+    if (bufferReadComplete_) {
+      return -1;
+    }
+
+    // First read
+    if (!spilledReadStart_) {
+      fetchFileBufferIndex_ = 0;
+      spilledReadStart_ = true;
+      notify();
+      return fetchFileBufferIndex_;
+    }
+
+    bufferBitState_.set(fetchFileBufferIndex_, true);
+    notify();
+    fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_;
+
+    while (bufferBitState_.get(fetchFileBufferIndex_)
+        && !bufferReadComplete_) {
+      wait();
+    }
+
+    if (bufferReadComplete_) {
+      return -1;
+    }
+
+    return fetchFileBufferIndex_;
+  }
+
+  @Override
+  public synchronized void completeReading() {
+    bufferReadComplete_ = true;
+    if (fileReadComplete_)
+      return;
+    notify();
+  }
+
+  /**
+   * Called by the thread to indicate that all the spilled data is
+   * completely read.
+   */
+  public synchronized void closedBySpiller() {
+    fileReadComplete_ = true;
+    bufferBitState_.set(fetchFileBufferIndex_, true);
+    notify();
+  }
+
+  @Override
+  public synchronized void startReading() {
+    while (!spilledReadStart_)
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted waiting to read the spilled file.", e);
+        throw new RuntimeException(e);
+      }
+  }
+
+}
+

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.message.io.SpillWriteIndexStatus;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+/**
+ * <code>SpillingBuffer</code> is an output stream comprised of byte arrays that
+ * keeps values in heap until a particular threshold is reached. Once this
+ * threshold is exceeded, the values are spilled to disk and all the contents of
+ * the buffer is written to a file until the stream is closed. The
+ * implementation uses a list of byte arrays and hence a user of this class may
+ * provide the size of each byte array to hold the data. The threshold could
+ * also be specified provided it exceeds the size of a single byte array. Once
+ * the stream is closed, the class provides an input stream to read the data
+ * written which may or may not have been spilled.
+ * 
+ */
+public class SpillingDataOutputBuffer extends DataOutputStream {
+
+  private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class);
+
+  /**
+   * This thread is responsible for writing from the ByteBuffers in the list to
+   * the file as they get available.
+   */
+  static class ProcessSpilledDataThread implements Callable<Boolean> {
+    private SpillWriteIndexStatus status_;
+    private List<ByteBuffer> bufferList_;
+    private long fileWrittenSize_;
+    private boolean closed;
+    SpilledDataProcessor processor;
+
+    ProcessSpilledDataThread(SpillWriteIndexStatus status,
+        List<ByteBuffer> bufferList, SpilledDataProcessor processor) {
+      status_ = status;
+      bufferList_ = bufferList;
+      closed = false;
+      this.processor = processor;
+    }
+
+    /**
+     * Keep writing to the file as the buffers gets available.
+     * 
+     * @throws IOException when the thread is interrupted while waiting to get
+     *           the index of the buffer to written to the file.
+     */
+    private void keepProcessingData() throws IOException {
+
+      int fileWriteIndex = -1;
+      do {
+
+        try {
+          fileWriteIndex = status_.getNextProcessorBufferIndex();
+        } catch (InterruptedException e1) {
+          throw new IOException(e1);
+        }
+        while (fileWriteIndex >= 0) {
+          ByteBuffer buffer = bufferList_.get(fileWriteIndex);
+          processor.handleSpilledBuffer(buffer);
+          buffer.clear();
+          try {
+            fileWriteIndex = status_.getNextProcessorBufferIndex();
+          } catch (InterruptedException e) {
+            LOG.error("Interrupted getting next index to process data.", e);
+            throw new IOException(e);
+          }
+
+        }
+      } while (!closed);
+
+
+    }
+
+    /*
+     * Indicate the thread that the spilling process is complete.
+     */
+    public void completeSpill() {
+      closed = true;
+    }
+
+    /**
+     * Gets the size of file written in bytes.
+     * 
+     * @return the size of file written.
+     */
+    public long getFileWrittenSize() {
+      return fileWrittenSize_;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+      keepProcessingData();
+      return Boolean.TRUE;
+    }
+
+  }
+
+  /**
+   * This class is responsible for holding the <code>ByteBuffer</code> arrays
+   * and writing data into the buffers. Once the the threshold is crossed it
+   * invokes a spilling thread that would spill the data from the buffer to the
+   * disk.
+   * 
+   */
+  static class SpillingStream extends OutputStream {
+
+    final byte[] b;
+    final boolean direct_;
+
+    private List<ByteBuffer> bufferList_;
+    private int bufferSize_;
+    private BitSet bufferState_;
+    private int numberBuffers_;
+    private ByteBuffer currentBuffer_;
+    private long bytesWritten_;
+    private long bytesRemaining_;
+    private SpillWriteIndexStatus spillStatus_;
+    private int thresholdSize_;
+    private boolean startedSpilling_;
+    private ProcessSpilledDataThread spillThread_;
+    private ExecutorService spillThreadService_;
+    private Future<Boolean> spillThreadState_;
+    private boolean closed_;
+
+    private SpilledDataProcessor processor;
+    /**
+     * The internal buffer where data is stored.
+     */
+    protected byte buf[];
+
+    /**
+     * The number of valid bytes in the buffer. This value is always in the
+     * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
+     * through <tt>buf[count-1]</tt> contain valid byte data.
+     */
+    protected int count;
+
+    /**
+     * Default intermediate buffer size;
+     */
+    protected int defaultBufferSize_;
+
+    /**
+     * 
+     * @param numBuffers The number of ByteBuffer the class should hold
+     * @param bufferSize The size of each ByteBuffer.
+     * @param threshold The threshold after which the spilling should start.
+     * @param direct true indicates the ByteBuffer be allocated direct
+     * @param fileName The name of the file where the spilled data should be
+     *          written into.
+     */
+    SpillingStream(int numBuffers, int bufferSize, int threshold,
+        boolean direct, SpilledDataProcessor processor) {
+      this(numBuffers, bufferSize, threshold, direct, processor, 8192);
+
+    }
+
+    /**
+     * 
+     * @param numBuffers The number of ByteBuffer the class should hold
+     * @param bufferSize The size of each ByteBuffer.
+     * @param threshold The threshold after which the spilling should start.
+     * @param direct true indicates the ByteBuffer be allocated direct
+     * @param fileName The name of the file where the spilled data should be
+     *          written into.
+     */
+    SpillingStream(int numBuffers, int bufferSize, int threshold,
+        boolean direct, SpilledDataProcessor processor, int interBufferSize) {
+
+      assert (threshold >= bufferSize);
+      assert (threshold < numBuffers * bufferSize);
+      if(interBufferSize > bufferSize){
+        interBufferSize = bufferSize/2;
+      }
+      defaultBufferSize_ = interBufferSize;
+      this.b = new byte[1];
+      this.buf = new byte[defaultBufferSize_];
+      count = 0;
+      direct_ = direct;
+      numberBuffers_ = numBuffers;
+      bufferSize_ = bufferSize;
+      bufferList_ = new ArrayList<ByteBuffer>(numberBuffers_);
+      bufferState_ = new BitSet(numBuffers);
+
+      for (int i = 0; i < numBuffers / 2; ++i) {
+        if (direct_) {
+          bufferList_.add(ByteBuffer.allocateDirect(bufferSize_));
+        } else {
+          bufferList_.add(ByteBuffer.allocate(bufferSize_));
+        }
+      }
+      currentBuffer_ = bufferList_.get(0);
+      bytesWritten_ = 0L;
+      bytesRemaining_ = bufferSize_;
+      spillStatus_ = new SpillWriteIndexStatus(bufferSize, numberBuffers_, 0,
+          -1, bufferState_);
+      thresholdSize_ = threshold;
+      startedSpilling_ = false;
+      spillThread_ = null;
+      spillThreadState_ = null;
+      this.processor = processor;
+      closed_ = false;
+
+    }
+    
+    public void clear() throws IOException{
+      this.close();
+      startedSpilling_ = false;
+      bufferState_.clear();
+
+      for (int i = 0; i < bufferList_.size(); ++i) {
+        bufferList_.get(i).clear();
+      }
+      currentBuffer_ = bufferList_.get(0);
+      bytesWritten_ = 0L;
+      bytesRemaining_ = bufferSize_;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      if (count < buf.length - 1) {
+        buf[count++] = (byte) (b & 0xFF);
+        return;
+      }
+      
+      this.b[0] = (byte) (b & 0xFF);
+      write(this.b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+      write(b, 0, b.length);
+    }
+
+    /**
+     * Keep track of the data written to the buffer. If it exceeds the threshold
+     * start the spilling thread.
+     * 
+     * @param len
+     * @throws InterruptedException
+     */
+    private void startSpilling() throws InterruptedException {
+      synchronized (this) {
+        spillThread_ = new ProcessSpilledDataThread(spillStatus_, bufferList_,
+            processor);
+        startedSpilling_ = true;
+        spillThreadService_ = Executors.newFixedThreadPool(1);
+        spillThreadState_ = spillThreadService_.submit(spillThread_);
+        spillStatus_.startSpilling();
+      }
+      // }
+    }
+
+    public void perfectFillWrite(byte[] b, int off, int len) throws IOException {
+      int rem = currentBuffer_.remaining();
+      while (len > rem) {
+        currentBuffer_.put(b, off, rem);
+        // bytesWritten_ += len;
+        // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
+        // try {
+        // startSpilling(rem);
+        // } catch (InterruptedException e) {
+        // throw new IOException("Internal error occured writing to buffer.",
+        // e);
+        // }
+        // }
+
+        currentBuffer_.flip();
+        int index = spillStatus_.getNextBufferIndex();
+        currentBuffer_ = getBuffer(index);
+        off += rem;
+        len -= rem;
+        rem = currentBuffer_.remaining();
+      }
+      currentBuffer_.put(b, off, len);
+      // bytesWritten_ += len;
+      // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
+      // try {
+      // startSpilling(len);
+      // } catch (InterruptedException e) {
+      // throw new IOException("Internal error occured writing to buffer.", e);
+      // }
+      // }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (len >= buf.length) {
+        /*
+         * If the request length exceeds the size of the output buffer, flush
+         * the output buffer and then write the data directly. In this way
+         * buffered streams will cascade harmlessly.
+         */
+        flushBuffer();
+        writeInternal(b, off, len);
+        return;
+      }
+      if (len > buf.length - count) {
+        flushBuffer();
+      }
+      System.arraycopy(b, off, buf, count, len);
+      count += len;
+    }
+
+    private void writeInternal(byte[] b, int off, int len) throws IOException {
+
+      bytesWritten_ += len;
+      if (bytesWritten_ >= thresholdSize_ && !startedSpilling_) {
+        try {
+          startSpilling();
+        } catch (InterruptedException e) {
+          throw new IOException("Internal error occured writing to buffer.", e);
+        }
+      }
+
+      if (len > bytesRemaining_) {
+        currentBuffer_.flip();
+        currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
+        bytesRemaining_ = bufferSize_;
+      }
+      currentBuffer_.put(b, off, len);
+      bytesRemaining_ -= len;
+
+    }
+
+    /** Flush the internal buffer */
+    private void flushBuffer() throws IOException {
+      if (count > 0) {
+        writeInternal(buf, 0, count);
+        count = 0;
+      }
+    }
+
+    /**
+     * Gets the ByteBuffer from the buffer list.
+     * 
+     * @param index
+     * @return
+     * @throws IOException
+     */
+    ByteBuffer getBuffer(int index) throws IOException {
+
+      if (index >= bufferList_.size()) {
+        if (direct_) {
+          bufferList_.add(index, ByteBuffer.allocateDirect(bufferSize_));
+        } else {
+          bufferList_.add(index, ByteBuffer.allocate(bufferSize_));
+        }
+      }
+
+      return bufferList_.get(index);
+    }
+
+    /**
+     * Closes the spilling process.
+     */
+    public void flush() throws IOException {
+      flushBuffer();
+      flushInternal();
+    }
+
+    public void flushInternal() throws IOException {
+      if (closed_)
+        return;
+
+      currentBuffer_.flip();
+      spillStatus_.spillCompleted();
+      if (this.startedSpilling_) {
+        this.spillThread_.completeSpill();
+        boolean completionState = false;
+        try {
+          completionState = spillThreadState_.get();
+          if (!completionState) {
+            throw new IOException(
+                "Spilling Thread failed to complete sucessfully.");
+          }
+        } catch (ExecutionException e) {
+          throw new IOException(e);
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        } finally {
+          closed_ = true;
+          this.processor.close();
+          this.spillThreadService_.shutdownNow();
+        }
+        
+      }
+    }
+
+  }
+
+  /**
+   * Initialize the spilling buffer with spilling file name
+   * 
+   * @param fileName name of the file.
+   * @throws FileNotFoundException 
+   */
+  public SpillingDataOutputBuffer(String fileName) throws FileNotFoundException {
+    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, 
+        new WriteSpilledDataProcessor(fileName)));
+  }
+  
+  public SpillingDataOutputBuffer(SpilledDataProcessor processor) throws FileNotFoundException {
+    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, 
+        processor));
+  }
+  
+  
+
+  /**
+   * Initializes the spilling buffer.
+   * @throws FileNotFoundException 
+   */
+  public SpillingDataOutputBuffer() throws FileNotFoundException {
+    super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
+        new WriteSpilledDataProcessor(
+        System.getProperty("java.io.tmpdir") + File.separatorChar
+            + new BigInteger(128, new SecureRandom()).toString(32))));
+  }
+
+  /**
+   * 
+   * @param bufferCount
+   * @param bufferSize
+   * @param threshold
+   * @param direct
+   * @param fileName
+   */
+  public SpillingDataOutputBuffer(int bufferCount, int bufferSize, int threshold,
+      boolean direct, SpilledDataProcessor processor) {
+    super(new SpillingStream(bufferCount, bufferSize, threshold, direct,
+        processor));
+  }
+  
+  public void clear() throws IOException{
+    SpillingStream stream = (SpillingStream) this.out;
+    stream.clear();
+  }
+  
+  public boolean hasSpilled(){
+    return ((SpillingStream) this.out).startedSpilling_;
+  }
+
+  /**
+   * Provides an input stream to read from the spilling buffer.
+   * 
+   * @throws IOException
+   */
+  public SpilledDataInputBuffer getInputStreamToRead(String fileName) throws IOException {
+
+    SpillingStream stream = (SpillingStream) this.out;
+    SpilledDataInputBuffer.SpilledInputStream inStream = new SpilledDataInputBuffer.SpilledInputStream(
+        fileName, stream.direct_, stream.bufferList_,
+        stream.startedSpilling_);
+    inStream.prepareRead();
+    return new SpilledDataInputBuffer(inStream);
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * 
+ *
+ */
+public class WriteSpilledDataProcessor implements SpilledDataProcessor {
+
+  private static final Log LOG = LogFactory
+      .getLog(WriteSpilledDataProcessor.class);
+
+  private FileChannel fileChannel;
+  private RandomAccessFile raf;
+  private String fileName;
+
+  public WriteSpilledDataProcessor(String fileName)
+      throws FileNotFoundException {
+    this.fileName = fileName;
+    raf = new RandomAccessFile(fileName, "rw");
+    fileChannel = raf.getChannel();
+  }
+
+  @Override
+  public boolean init(Configuration conf) {
+    return true;
+  }
+
+  @Override
+  public boolean handleSpilledBuffer(ByteBuffer buffer) {
+    try {
+      fileChannel.write(buffer);
+      return true;
+    } catch (IOException e) {
+      LOG.error("Error writing to file:"+fileName, e);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean close() {
+    try {
+      fileChannel.close();
+    } catch (IOException e) {
+      LOG.error("Error writing to file:" + fileName, e);
+    }
+    return true;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.io.PreFetchCache;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+/**
+ * 
+ *
+ * @param <M>
+ */
+public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
+
+  private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
+
+  private Configuration conf;
+  private SpillingDataOutputBuffer spillOutputBuffer;
+  private int numMessagesWritten;
+  private int numMessagesRead;
+
+  public final static String SPILLBUFFER_COUNT = "hama.io.spillbuffer.count";
+  public final static String SPILLBUFFER_SIZE = "hama.io.spillbuffer.size";
+  public final static String SPILLBUFFER_FILENAME = "hama.io.spillbuffer.filename";
+  public final static String SPILLBUFFER_THRESHOLD = "hama.io.spillbuffer.threshold";
+  public final static String SPILLBUFFER_DIRECT = "hama.io.spillbuffer.direct";
+  public final static String ENABLE_PREFETCH = "hama.io.spillbuffer.enableprefetch";
+  public final static String SPILLBUFFER_MSGCLASS = "hama.io.spillbuffer.msgclass";
+  private int bufferCount;
+  private int bufferSize;
+  private String fileName;
+  private int threshold;
+  private boolean direct;
+  private SpilledDataInputBuffer spilledInput;
+  private boolean objectWritableMode;
+  private ObjectWritable objectWritable;
+  
+  private Class<M> messageClass;
+  private PreFetchCache<M> prefetchCache;
+  private boolean enablePrefetch;
+
+  
+  private class SpillIterator implements Iterator<M> {
+
+    private boolean objectMode;
+    private Class<M> classObject;
+    private M messageHolder;
+    
+    public SpillIterator(boolean mode, Class<M> classObj, Configuration conf){
+      this.objectMode = mode;
+      this.classObject = classObj;
+      if(classObject != null){
+        messageHolder = ReflectionUtils.newInstance(classObj, conf);
+      }
+    }
+    
+    @Override
+    public boolean hasNext() {
+      return numMessagesRead != numMessagesWritten && numMessagesWritten > 0;
+    }
+
+    @Override
+    public M next() {
+      if(objectMode){
+        return poll();
+      }
+      else {
+        return poll(messageHolder);
+      }
+    }
+
+    @Override
+    public void remove() {
+      // do nothing
+    }
+
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return new SpillIterator(objectWritableMode, messageClass, conf);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    this.objectWritable.setConf(conf);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void add(M msg) {
+    try {
+      if (objectWritableMode) {
+        objectWritable.set(msg);
+        objectWritable.write(spillOutputBuffer);
+      } else {
+        msg.write(spillOutputBuffer);
+      }
+      ++numMessagesWritten;
+    } catch (IOException e) {
+      LOG.error("Error adding message.", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  @Override
+  public void addAll(Collection<M> msgs) {
+    for (M msg : msgs) {
+      add(msg);
+    }
+
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> arg0) {
+    Iterator<M> iter = arg0.iterator();
+    while (iter.hasNext()) {
+      add(iter.next());
+    }
+  }
+
+  @Override
+  public void clear() {
+    try {
+      spillOutputBuffer.close();
+      spillOutputBuffer.clear();
+      spilledInput.close();
+      spilledInput.clear();
+    } catch (IOException e) {
+      LOG.error("Error clearing spill stream.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+
+    try {
+      this.spillOutputBuffer.close();
+      this.spilledInput.close();
+      this.spilledInput.completeReading(true);
+    } catch (IOException e) {
+      LOG.error("Error closing the spilled input stream.", e);
+      throw new RuntimeException(e);
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void init(Configuration conf, TaskAttemptID arg1) {
+
+    bufferCount = conf.getInt(SPILLBUFFER_COUNT, 3);
+    bufferSize = conf.getInt(SPILLBUFFER_SIZE, 16 * 1024);
+    direct = conf.getBoolean(SPILLBUFFER_DIRECT, true);
+    threshold = conf.getInt(SPILLBUFFER_THRESHOLD, 16 * 1024);
+    fileName = conf.get(SPILLBUFFER_FILENAME,
+        System.getProperty("java.io.tmpdir") + File.separatorChar
+            + new BigInteger(128, new SecureRandom()).toString(32));
+    
+    messageClass = (Class<M>) conf.getClass(SPILLBUFFER_MSGCLASS, null);
+    objectWritableMode = messageClass == null;
+    
+    SpilledDataProcessor processor;
+    try {
+      processor = new WriteSpilledDataProcessor(fileName);
+    } catch (FileNotFoundException e) {
+      LOG.error("Error initializing spilled data stream.", e);
+      throw new RuntimeException(e);
+    }
+    spillOutputBuffer = new SpillingDataOutputBuffer(bufferCount, bufferSize,
+        threshold, direct, processor);
+    objectWritable = new ObjectWritable();
+    objectWritable.setConf(conf);
+    this.conf = conf;
+  }
+  
+  private void incReadMsgCount(){
+    ++numMessagesRead;
+  }
+  
+  @SuppressWarnings("unchecked")
+  private M readDirect(M msg){
+    if(numMessagesRead >= numMessagesWritten){
+      return null;
+    }
+    try {
+      if (objectWritableMode) {
+        objectWritable.readFields(spilledInput);
+        incReadMsgCount();
+        return (M) objectWritable.get();
+      } else {
+        msg.readFields(spilledInput);
+        incReadMsgCount();
+        return msg;
+      }
+    } catch (IOException e) {
+      LOG.error("Error getting values from spilled input", e);
+    }
+    return null;
+  }
+
+  public M poll(M msg) {
+    if(numMessagesRead >= numMessagesWritten){
+      return null;
+    }
+    if(enablePrefetch){
+      return readFromPrefetch(msg);
+    }
+    else {
+      return readDirect(msg);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private M readDirectObjectWritable(){
+    if (!objectWritableMode) {
+      throw new IllegalStateException(
+          "API call not supported. Set the configuration property "
+              + "'hama.io.spillbuffer.newmsginit' to true.");
+    }
+    try {
+      objectWritable.readFields(spilledInput);
+      incReadMsgCount();
+    } catch (IOException e) {
+      LOG.error("Error getting values from spilled input", e);
+      return null;
+    }
+    return (M) objectWritable.get();
+  }
+  
+  @SuppressWarnings({ "unchecked" })
+  private M readFromPrefetch(M msg){
+    if(objectWritableMode){
+      this.objectWritable = (ObjectWritable) prefetchCache.get();
+      incReadMsgCount();
+      return (M)this.objectWritable.get();
+    }
+    else {
+      incReadMsgCount();
+      return (M)this.prefetchCache.get();
+    }
+    
+  }
+
+  @Override
+  public M poll() {
+    if(numMessagesRead >= numMessagesWritten){
+      return null;
+    }
+    
+    if(enablePrefetch){
+      M msg = readFromPrefetch(null);
+      if(msg != null)
+        incReadMsgCount();
+      return msg;
+    }
+    else {
+      return readDirectObjectWritable();
+    }
+  }
+
+  @Override
+  public void prepareRead() {
+    try {
+      spillOutputBuffer.close();
+    } catch (IOException e) {
+      LOG.error("Error closing spilled buffer", e);
+      throw new RuntimeException(e);
+    }
+    try {
+      spilledInput = spillOutputBuffer.getInputStreamToRead(fileName);
+    } catch (IOException e) {
+      LOG.error("Error initializing the input spilled stream", e);
+      throw new RuntimeException(e);
+    }
+    if(conf.getBoolean(ENABLE_PREFETCH, false)){
+      this.prefetchCache = new PreFetchCache<M>(numMessagesWritten);
+      this.enablePrefetch = true;
+      try {
+        this.prefetchCache.startFetching(this.messageClass, spilledInput, conf);
+      } catch (InterruptedException e) {
+        LOG.error("Error starting prefetch on message queue.", e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public void prepareWrite() {
+    numMessagesWritten = 0;
+  }
+
+  @Override
+  public int size() {
+    return numMessagesWritten;
+  }
+
+}

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.EOFException;
+import java.io.File;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+import junit.framework.TestCase;
+
+public class TestMessageIO extends TestCase {
+
+  public void testNonSpillBuffer() throws Exception {
+
+    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer();
+    Text text = new Text("Testing the spillage of spilling buffer");
+
+    for (int i = 0; i < 100; ++i) {
+      text.write(outputBuffer);
+    }
+
+    assertTrue(outputBuffer != null);
+    assertTrue(outputBuffer.size() == 4000);
+    assertFalse(outputBuffer.hasSpilled());
+
+    outputBuffer.close();
+
+  }
+
+  public void testSpillBuffer() throws Exception {
+
+    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+        1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+    Text text = new Text("Testing the spillage of spilling buffer");
+
+    for (int i = 0; i < 100; ++i) {
+      text.write(outputBuffer);
+    }
+
+    assertTrue(outputBuffer != null);
+    assertTrue(outputBuffer.size() == 4000);
+    assertTrue(outputBuffer.hasSpilled());
+    File f = new File(fileName);
+    assertTrue(f.exists());
+    assertTrue(f.delete());
+    outputBuffer.close();
+
+  }
+
+  public void testSpillInputStream() throws Exception {
+    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+        1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+    Text text = new Text("Testing the spillage of spilling buffer");
+
+    for (int i = 0; i < 100; ++i) {
+      text.write(outputBuffer);
+    }
+
+    assertTrue(outputBuffer != null);
+    assertTrue(outputBuffer.size() == 4000);
+    assertTrue(outputBuffer.hasSpilled());
+    File f = new File(fileName);
+    assertTrue(f.exists());
+    outputBuffer.close();
+    assertTrue(f.length() == 4000L);
+
+    SpilledDataInputBuffer inputBuffer = outputBuffer
+        .getInputStreamToRead(fileName);
+
+    for (int i = 0; i < 100; ++i) {
+      text.readFields(inputBuffer);
+      assertTrue("Testing the spillage of spilling buffer".equals(text
+          .toString()));
+      text.clear();
+    }
+
+    try {
+      text.readFields(inputBuffer);
+      assertTrue(false);
+    } catch (EOFException eof) {
+      assertTrue(true);
+    }
+
+    inputBuffer.close();
+    inputBuffer.completeReading(false);
+    assertTrue(f.exists());
+    inputBuffer.completeReading(true);
+    assertFalse(f.exists());
+
+  }
+
+}

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.message.queue.SpillingQueue;
+
+public class TestSpillingQueue extends TestCase {
+
+  /**
+   * Test the spilling queue where the message class is specified.
+   * @throws Exception
+   */
+  public void testTextSpillingQueue() throws Exception {
+
+    String msg = "Testing the spillage of spilling buffer";
+    Text text = new Text(msg);
+    TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
+    SpillingQueue<Text> queue = new SpillingQueue<Text>();
+    Configuration conf = new HamaConfiguration();
+    
+    String fileName = 
+        System.getProperty("java.io.tmpdir") + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+    File file = new File(fileName);
+    conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
+    conf.setClass(SpillingQueue.SPILLBUFFER_MSGCLASS, Text.class,
+        Writable.class);
+    queue.init(conf, id);
+    queue.prepareWrite();
+    for(int i = 0; i < 1000; ++i){
+      queue.add(text);
+    }
+    queue.prepareRead();
+    for(Text t: queue){
+      assertTrue(msg.equals(t.toString()));
+      text.clear();
+    }
+    
+    assertTrue(queue.poll() == null);
+    
+    assertTrue(file.exists());
+    queue.close();
+    assertFalse(file.exists());
+  }
+  
+  /**
+   * Test the spilling queue where the message class is not specified and the
+   * queue uses ObjectWritable to store messages.
+   * @throws Exception
+   */
+  public void testObjectWritableSpillingQueue() throws Exception {
+
+    String msg = "Testing the spillage of spilling buffer";
+    Text text = new Text(msg);
+    TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
+    SpillingQueue<Text> queue = new SpillingQueue<Text>();
+    Configuration conf = new HamaConfiguration();
+    
+    String fileName = 
+        System.getProperty("java.io.tmpdir") + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+    File file = new File(fileName);
+    conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
+    queue.init(conf, id);
+    queue.prepareWrite();
+    for(int i = 0; i < 1000; ++i){
+      queue.add(text);
+    }
+    queue.prepareRead();
+    for(Text t: queue){
+      assertTrue(msg.equals(t.toString()));
+      text.clear();
+    }
+    
+    assertTrue(queue.poll() == null);
+    
+    assertTrue(file.exists());
+    queue.close();
+    assertFalse(file.exists());
+  }
+
+}