You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/01/13 21:52:04 UTC
svn commit: r1432734 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/
core/src/main/java/org/apache/hama/bsp/message/io/
core/src/main/java/org/apache/hama/bsp/message/queue/
core/src/test/java/org/apache/hama/bsp/message/
Author: surajsmenon
Date: Sun Jan 13 20:52:03 2013
New Revision: 1432734
URL: http://svn.apache.org/viewvc?rev=1432734&view=rev
Log:
[HAMA-559] Added spilling queue.
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1432734&r1=1432733&r2=1432734&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sun Jan 13 20:52:03 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
NEW FEATURES
+ HAMA-559: Added spilling queue. (surajsmenon)
HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)
HAMA-524: Add SpMV example (Mikalai Parafeniuk via edwardyoon)
HAMA-658: Add random symmetric sparse matrix generator (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1432734&r1=1432733&r2=1432734&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Sun Jan 13 20:52:03 2013
@@ -68,6 +68,8 @@ public interface Constants {
public static final String COMBINER_CLASS = "bsp.combiner.class";
public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
+
+ public static final String COMBINER_CLASS = "bsp.combiner.class";
////////////////////////////////////////
// Task scheduler related constants
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+/**
+ * The implementation of shared object when there is no spilling in the
+ * first place.
+ *
+ */
+class BufferReadStatus extends ReadIndexStatus {
+
+ private int index;
+ private int count;
+
+ public BufferReadStatus(int bufferCount) {
+ index = -1;
+ count = bufferCount;
+ }
+
+ @Override
+ public int getReadBufferIndex() {
+ if(count == index - 1){
+ return -1;
+ }
+ return ++index;
+ }
+
+ @Override
+ public int getFileBufferIndex() {
+ return -1;
+ }
+
+ @Override
+ public void completeReading() {
+ }
+
+ @Override
+ public void startReading() {
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.message.io.PreFetchCache;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+
+public class PreFetchCache<M extends Writable> {
+ private static final Log LOG = LogFactory.getLog(PreFetchCache.class);
+ private Object[] objectListArr;
+ private long totalMessages;
+ private int capacity;
+ private int arrIndex;
+ private int listIndex;
+ private PreFetchThread<M> preFetchThread;
+ private SpilledDataReadStatus status;
+ private BitSet bufferBitSet;
+
+ private static class PreFetchThread<M extends Writable> extends Thread
+ implements Runnable {
+
+ private volatile boolean stopReading;
+
+ SpilledDataInputBuffer stream;
+ Class<M> objectClass;
+ private Object[] listArr;
+ SpilledDataReadStatus status;
+ private long totalMsgs;
+ private int listCapacity;
+ private int messageCount;
+ private Configuration conf;
+
+ private void fill(int index) throws IOException {
+ @SuppressWarnings("unchecked")
+ List<? super Writable> list = (List<? super Writable>) listArr[index];
+
+ for (int i = 0; i < listCapacity && messageCount < totalMsgs
+ && !stopReading; ++i) {
+
+ if (i == list.size()) {
+ if (objectClass == null) {
+ ObjectWritable writable = new ObjectWritable();
+ writable.readFields(stream);
+ list.add(i, writable);
+ } else {
+ M obj = ReflectionUtils.newInstance(objectClass, conf);
+ obj.readFields(stream);
+ list.add(i, obj);
+ }
+ } else {
+ Writable obj = (Writable) list.get(i);
+ obj.readFields(stream);
+ }
+ ++messageCount;
+ }
+
+ }
+
+ public PreFetchThread(Class<M> classObj, Object[] objectListArr,
+ int capacity, SpilledDataInputBuffer inStream, long totalMessages,
+ SpilledDataReadStatus indexStatus, Configuration conf) {
+ objectClass = classObj;
+ listArr = objectListArr;
+ status = indexStatus;
+ this.conf = conf;
+ totalMsgs = totalMessages;
+ messageCount = 0;
+ this.stream = inStream;
+ listCapacity = capacity;
+ }
+
+ @Override
+ public void run() {
+ int index = 0;
+ try {
+ while ((index = status.getFileBufferIndex()) >= 0 && !stopReading) {
+ fill(index);
+ if (stopReading || messageCount == totalMsgs) {
+ status.closedBySpiller();
+ break;
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted reading pre-fetch buffer index.", e);
+ } catch (IOException e) {
+ LOG.error("Error reading pre-fetch buffer index.", e);
+ }
+ status.completeReading();
+ }
+
+ public synchronized void stopReading() {
+ stopReading = true;
+ }
+
+ }
+
+ public PreFetchCache(long totalMessages) {
+ this(2, totalMessages, 64);
+ }
+
+ public PreFetchCache(int numBuffers, long totalMessages) {
+ this(numBuffers, totalMessages, 64);
+ }
+
+ public PreFetchCache(int numBuffers, long totalMessages, int capacity) {
+ this.objectListArr = new Object[numBuffers];
+ this.totalMessages = totalMessages;
+ this.bufferBitSet = new BitSet();
+ status = new SpilledDataReadStatus(numBuffers, bufferBitSet);
+ for (int i = 0; i < numBuffers; ++i) {
+ this.objectListArr[i] = new ArrayList<M>(capacity);
+ }
+ this.capacity = capacity;
+ }
+
+ public void startFetching(Class<M> classObject,
+ SpilledDataInputBuffer buffer, Configuration conf)
+ throws InterruptedException {
+
+ preFetchThread = new PreFetchThread<M>(classObject, objectListArr,
+ capacity, buffer, totalMessages, status, conf);
+ preFetchThread.start();
+ status.startReading();
+ arrIndex = status.getReadBufferIndex();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Writable get() {
+ if (listIndex == capacity) {
+ try {
+ arrIndex = status.getReadBufferIndex();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted getting prefetched records.", e);
+ return null;
+ }
+ if (arrIndex < 0) {
+ return null;
+ }
+ listIndex = 0;
+ }
+ return ((List<M>) (this.objectListArr[arrIndex])).get(listIndex++);
+ }
+
+ public void close() {
+ status.completeReading();
+ this.preFetchThread.stopReading();
+ try {
+ this.preFetchThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Prefetch thread was interrupted.", e);
+ }
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+/**
+ * The base class that defines the shared object that synchronizes the
+ * indexes for the byte array such that there is no loss of data.
+ *
+ */
+abstract class ReadIndexStatus {
+
+ /**
+ * Returns index of the byte array that is ready for the data to be read
+ * from. The implementation may or may not block depending on the spilling
+ * situation.
+ *
+ * @return the index.
+ * @throws InterruptedException
+ */
+ public abstract int getReadBufferIndex() throws InterruptedException;
+
+ /**
+ * Returns the index of the byte array that could used to load the data
+ * from the spilled file. The implementation may or may not block
+ * depending on the spilling situation.
+ *
+ * @return
+ * @throws InterruptedException
+ */
+ public abstract int getFileBufferIndex() throws InterruptedException;
+
+ /**
+ * Indicate that the operation is complete.
+ */
+ public abstract void completeReading();
+
+ /**
+ * Indicate to start reading.
+ */
+ public abstract void startReading();
+
+}
+
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * This class provides a thread-safe interface for both the spilling thread and
+ * the thread that is writing into <code>SpillingBuffer</code>. It stores the
+ * state and manipulates the next available buffer for both the threads.
+ */
+class SpillWriteIndexStatus {
+
+ private volatile int bufferListWriteIndex;
+ private volatile int processorBufferIndex;
+ private volatile boolean spillComplete;
+ private volatile boolean spillStart;
+ private int numBuffers;
+ private volatile BitSet bufferBitState;
+
+ SpillWriteIndexStatus(int size, int bufferCount, int bufferIndex,
+ int fileWriteIndex, BitSet bufferBitState) {
+ spillComplete = false;
+ spillStart = false;
+ bufferListWriteIndex = bufferIndex;
+ processorBufferIndex = fileWriteIndex;
+ numBuffers = bufferCount;
+ this.bufferBitState = bufferBitState;
+ }
+
+ /**
+ * Returns the index of next available buffer from the list. The call blocks
+ * until it is available and indicated by the spilling thread.
+ *
+ * @return the index of the next available buffer
+ * @throws IOException when the thread is interrupted while blocked on the
+ * availability of the next buffer.
+ */
+ public synchronized int getNextBufferIndex() throws IOException {
+ assert !spillComplete;
+ bufferBitState.set(bufferListWriteIndex, true);
+ notify();
+ bufferListWriteIndex = (bufferListWriteIndex + 1) % numBuffers;
+ while (bufferBitState.get(bufferListWriteIndex)) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ return bufferListWriteIndex;
+ }
+
+ /**
+ * Returns the index of the next buffer from the list that has information
+ * written to be spilled to disk. The call blocks until a buffer is available
+ * and indicated by the data writing thread.
+ *
+ * @return the index of the next to be spilled buffer.
+ * @throws InterruptedException when the thread is interrupted while blocked
+ * on the availability of the next buffer.
+ */
+ public synchronized int getNextProcessorBufferIndex()
+ throws InterruptedException {
+
+ // Indicate to main thread that the spilling thread has started.
+ if (!spillStart) {
+ spillStart = true;
+ notify();
+ }
+ if (processorBufferIndex >= 0) {
+ assert bufferBitState.get(processorBufferIndex);
+ bufferBitState.set(processorBufferIndex, false);
+ notify();
+ }
+ processorBufferIndex = (processorBufferIndex + 1) % numBuffers;
+ while (!bufferBitState.get(processorBufferIndex) && !spillComplete) {
+ wait();
+ }
+ // Is the last buffer written to file after the spilling is complete?
+ // then complete the operation.
+ if (spillComplete && bufferBitState.isEmpty()) {
+ return -1;
+ }
+ return processorBufferIndex;
+ }
+
+ /**
+ * This call indicates that the spilling has started. It is blocked until the
+ * spilling thread makes its first call to get the buffer with data to be
+ * spilled.
+ *
+ * @throws InterruptedException
+ */
+ public synchronized void startSpilling() throws InterruptedException {
+
+ while (!spillStart) {
+ wait();
+ }
+ }
+
+ /**
+ * This call informs the closing of the buffer and thereby indicating the
+ * spilling thread to complete.
+ */
+ public synchronized void spillCompleted() {
+ assert !spillComplete;
+ spillComplete = true;
+ bufferBitState.set(bufferListWriteIndex, true);
+ notify();
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hama.bsp.message.io.BufferReadStatus;
+import org.apache.hama.bsp.message.io.ReadIndexStatus;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+
+/**
+ * <code>SpilledDataInputBuffer</code> class is designed to read from the
+ * spilling buffer. Depending on whether the records were spilled or not, the
+ * stream provides data from a list of byte arrays or from file. The contents of
+ * the file are asynchronously loaded in the byte arrays as the values are read.
+ *
+ */
+public class SpilledDataInputBuffer extends DataInputStream implements
+ DataInput {
+
+ /**
+ * The thread is used to asynchronously read from the spilled file and load
+ * the buffer for the user to read from byte arrays in heap.
+ */
+ static class SpillReadThread implements Callable<Boolean> {
+
+ private String fileName;
+ private List<ByteBuffer> bufferList_;
+ private long bytesToRead_;
+ private long bytesWrittenInFile_;
+ private SpilledDataReadStatus status_;
+ private boolean closed_;
+
+ /**
+ * Creates the thread to read the contents of the file and loads into the
+ * list of byte arrays.
+ *
+ * @param fileName Name of the file
+ * @param bufferList list of byte arrays.
+ * @param bytesInFile Total bytes in the file.
+ * @param status The shared object that synchronizes the indexes for buffer
+ * to fill the data with.
+ */
+ public SpillReadThread(String fileName, List<ByteBuffer> bufferList,
+ SpilledDataReadStatus status) {
+ this.fileName = fileName;
+ bufferList_ = bufferList;
+ status_ = status;
+ closed_ = false;
+ }
+
+ /**
+ * Keeps reading from file and loads the next available byte array with the
+ * data from the file.
+ *
+ * @throws IOException
+ */
+ private void keepReadingFromFile() throws IOException {
+ RandomAccessFile raf = new RandomAccessFile(fileName, "r");
+ FileChannel fc = raf.getChannel();
+ bytesToRead_ = fc.size();
+ bytesWrittenInFile_ = bytesToRead_;
+ MappedByteBuffer mBuffer = null;
+ long fileReadPos = 0;
+ int fileReadIndex = -1;
+ do {
+ try {
+ fileReadIndex = status_.getFileBufferIndex();
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+
+ if (fileReadIndex < 0)
+ break;
+
+ ByteBuffer buffer = bufferList_.get(fileReadIndex);
+ buffer.clear();
+ long readSize = Math.min(buffer.remaining(),
+ (bytesWrittenInFile_ - fileReadPos));
+
+ mBuffer = fc.map(MapMode.READ_ONLY, fileReadPos, readSize);
+ buffer.put(mBuffer);
+ buffer.flip();
+ bytesToRead_ -= readSize;
+ fileReadPos += readSize;
+
+ } while (!closed_ && bytesToRead_ > 0 && fileReadIndex >= 0
+ && fileReadIndex < bufferList_.size());
+ fc.close();
+ closed_ = true;
+ status_.closedBySpiller();
+ }
+
+ /*
+ * Indicate the thread to close.
+ */
+ public void completeRead() {
+ closed_ = true;
+ }
+
+ public boolean isClosed() {
+ return closed_;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ keepReadingFromFile();
+ return Boolean.TRUE;
+ }
+
+ }
+
+ /**
+ * The input stream that encapsulates a previously written spilled buffer that
+ * has a list of byte arrays and a file to read from in the case where
+ * spilling is already done. If spilling has not happened, all the data is
+ * present in the list of byte arrays.
+ *
+ */
+ static class SpilledInputStream extends InputStream {
+
+ private String fileName_;
+ private List<ByteBuffer> bufferList_;
+ private boolean spilledAlready_;
+ ReadIndexStatus status_;
+ private final byte[] readByte = new byte[1];
+ private int count;
+ private final byte[] buf;
+ private int pos;
+ private Callable<Boolean> spillReadThread_;
+ private Future<Boolean> spillReadState_;
+ private ExecutorService spillThreadService_;
+ private ByteBuffer currentReadBuffer_;
+ private BitSet bufferBitState_;
+
+ private boolean closed_;
+
+ public SpilledInputStream(String fileName, boolean direct,
+ List<ByteBuffer> bufferList, boolean hasSpilled) throws IOException {
+ fileName_ = fileName;
+ bufferList_ = bufferList;
+ spilledAlready_ = hasSpilled;
+ bufferBitState_ = new BitSet(bufferList.size());
+
+ if (spilledAlready_) {
+ status_ = new SpilledDataReadStatus(bufferList.size(), bufferBitState_);
+ } else {
+ status_ = new BufferReadStatus(bufferList.size());
+ }
+
+ buf = new byte[8192];
+ count = 0;
+ pos = 0;
+ closed_ = false;
+
+ }
+
+ public void prepareRead() throws IOException {
+ if (spilledAlready_) {
+ spillReadThread_ = new SpillReadThread(fileName_, bufferList_,
+ (SpilledDataReadStatus) status_);
+ spillThreadService_ = Executors.newFixedThreadPool(1);
+ spillReadState_ = spillThreadService_.submit(spillReadThread_);
+ status_.startReading();
+ }
+ try {
+ currentReadBuffer_ = getNextBuffer();
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+
+ if (currentReadBuffer_ == null) {
+ if (spilledAlready_) {
+ try {
+ spillReadState_.get();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ spillThreadService_.shutdownNow();
+ }
+ }
+ throw new IOException("Could not initialize the buffer for reading");
+ }
+ }
+
+ public ByteBuffer getNextBuffer() throws InterruptedException {
+ int index = status_.getReadBufferIndex();
+ if (index >= 0 && index < bufferList_.size()) {
+ return bufferList_.get(index);
+ }
+ return null;
+
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (count > 0) {
+ --count;
+ return buf[pos++] & 0xFF;
+ }
+
+ if (-1 == read(readByte, 0, 1)) {
+ return -1;
+ }
+ return readByte[0] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+
+ if (count >= len) {
+ // copy the count of bytes to b
+ System.arraycopy(buf, pos, b, off, len);
+ count -= len;
+ pos += len;
+ return len;
+ }
+ int size = 0;
+
+ while (len > 0) {
+ if (count == 0) {
+ count = readInternal(buf, 0, buf.length);
+ if (count == -1) {
+ return count;
+ }
+ pos = 0;
+ }
+ int readSize = Math.min(count, len);
+ System.arraycopy(buf, pos, b, off, readSize);
+ len -= readSize;
+ off += readSize;
+ size += readSize;
+ count -= readSize;
+ pos += readSize;
+ }
+ return size;
+
+ }
+
+ public int readInternal(byte[] b, int off, int len) throws IOException {
+ if (currentReadBuffer_ == null) {
+ return -1;
+ }
+ int cur = 0;
+ while (len > 0) {
+ int rem = currentReadBuffer_.remaining();
+ if (rem == 0) {
+ try {
+ currentReadBuffer_ = getNextBuffer();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ if (currentReadBuffer_ == null) {
+ return cur;
+ }
+ rem = currentReadBuffer_.remaining();
+ }
+ int readSize = Math.min(rem, len);
+ currentReadBuffer_.get(b, off, readSize);
+ len -= readSize;
+ off += rem;
+ cur += readSize;
+ }
+
+ return cur;
+ }
+
+ public void clear() throws IOException {
+ close();
+ bufferBitState_.clear();
+
+ if (spilledAlready_) {
+ status_ = new SpilledDataReadStatus(bufferList_.size(), bufferBitState_);
+ } else {
+ status_ = new BufferReadStatus(bufferList_.size());
+ }
+
+ count = 0;
+ pos = 0;
+ closed_ = false;
+ prepareRead();
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ if (closed_)
+ return;
+
+ status_.completeReading();
+ if (this.spilledAlready_) {
+ try {
+ this.spillReadState_.get();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } finally {
+ spillThreadService_.shutdownNow();
+ }
+ }
+
+ }
+
+ public String getFileName() {
+ return fileName_;
+ }
+
+ }
+
+ public void completeReading(boolean deleteFile) throws IOException {
+ in.close();
+ if (deleteFile) {
+ File file = new File(
+ ((SpilledDataInputBuffer.SpilledInputStream) in).getFileName());
+ if (file.exists())
+ file.delete();
+ }
+ }
+
+ public SpilledDataInputBuffer(InputStream in) {
+ super(in);
+ }
+
+ public void clear() throws IOException {
+ SpilledInputStream inStream = (SpilledInputStream) this.in;
+ inStream.clear();
+ }
+
+ public static SpilledDataInputBuffer getSpilledDataInputBuffer(
+ String fileName, boolean direct, List<ByteBuffer> bufferList)
+ throws IOException {
+ SpilledInputStream inStream = new SpilledInputStream(fileName, direct,
+ bufferList, true);
+ inStream.prepareRead();
+ return new SpilledDataInputBuffer(inStream);
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Base interface defining the behaviour to process the spilled data provided
+ * in a byte buffer.
+ */
+public interface SpilledDataProcessor {
+
+ /**
+ * Initialize the data processor.
+ * @param conf
+ * @return true if no errors.
+ */
+ boolean init(Configuration conf);
+
+ /**
+ * Override the method to define the action to be taken on the spilled data
+ * provided in the byte buffer.
+ * @param buffer
+ * @return true if no errors.
+ */
+ boolean handleSpilledBuffer(ByteBuffer buffer);
+
+ /**
+ * Close the data processor.
+ * @return true if no errors.
+ */
+ boolean close();
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * The implementation of the shared object when the buffer has already
+ * spilled to disk.
+ *
+ */
+class SpilledDataReadStatus extends ReadIndexStatus {
+
+ private static final Log LOG = LogFactory.getLog(SpilledDataReadStatus.class);
+
+ private volatile int readBufferIndex_;
+ private volatile int fetchFileBufferIndex_;
+ private int totalSize_;
+ private volatile boolean spilledReadStart_;
+ private volatile boolean fileReadComplete_;
+ private volatile boolean bufferReadComplete_;
+ private volatile BitSet bufferBitState_;
+
+ public SpilledDataReadStatus(int totalSize, BitSet bufferBitState) {
+ readBufferIndex_ = -1;
+ fetchFileBufferIndex_ = 0;
+ spilledReadStart_ = false;
+ fileReadComplete_ = false;
+ bufferReadComplete_ = false;
+ totalSize_ = totalSize;
+ bufferBitState_ = bufferBitState;
+ }
+
+ @Override
+ public synchronized int getReadBufferIndex() throws InterruptedException {
+
+ assert !bufferReadComplete_;
+
+ if (readBufferIndex_ >= 0) {
+ bufferBitState_.set(readBufferIndex_, false);
+ notify();
+ }
+ readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_;
+ while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_) {
+ wait();
+ }
+ // The file is completely read and transferred to buffers already.
+ if (bufferBitState_.isEmpty() && fileReadComplete_) {
+ return -1;
+ }
+ return readBufferIndex_;
+ }
+
+ @Override
+ public synchronized int getFileBufferIndex() throws InterruptedException {
+
+ assert !fileReadComplete_;
+
+ if (bufferReadComplete_) {
+ return -1;
+ }
+
+ // First read
+ if (!spilledReadStart_) {
+ fetchFileBufferIndex_ = 0;
+ spilledReadStart_ = true;
+ notify();
+ return fetchFileBufferIndex_;
+ }
+
+ bufferBitState_.set(fetchFileBufferIndex_, true);
+ notify();
+ fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_;
+
+ while (bufferBitState_.get(fetchFileBufferIndex_)
+ && !bufferReadComplete_) {
+ wait();
+ }
+
+ if (bufferReadComplete_) {
+ return -1;
+ }
+
+ return fetchFileBufferIndex_;
+ }
+
+ @Override
+ public synchronized void completeReading() {
+ bufferReadComplete_ = true;
+ if (fileReadComplete_)
+ return;
+ notify();
+ }
+
+ /**
+ * Called by the thread to indicate that all the spilled data is
+ * completely read.
+ */
+ public synchronized void closedBySpiller() {
+ fileReadComplete_ = true;
+ bufferBitState_.set(fetchFileBufferIndex_, true);
+ notify();
+ }
+
+ @Override
+ public synchronized void startReading() {
+ while (!spilledReadStart_)
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting to read the spilled file.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+}
+
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hama.bsp.message.io.SpillWriteIndexStatus;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+/**
+ * <code>SpillingBuffer</code> is an output stream comprised of byte arrays that
+ * keeps values in heap until a particular threshold is reached. Once this
+ * threshold is exceeded, the values are spilled to disk and all the contents of
+ * the buffer is written to a file until the stream is closed. The
+ * implementation uses a list of byte arrays and hence a user of this class may
+ * provide the size of each byte array to hold the data. The threshold could
+ * also be specified provided it exceeds the size of a single byte array. Once
+ * the stream is closed, the class provides an input stream to read the data
+ * written which may or may not have been spilled.
+ *
+ */
+public class SpillingDataOutputBuffer extends DataOutputStream {
+
+ private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class);
+
+ /**
+ * This thread is responsible for writing from the ByteBuffers in the list to
+ * the file as they get available.
+ */
+ static class ProcessSpilledDataThread implements Callable<Boolean> {
+ private SpillWriteIndexStatus status_;
+ private List<ByteBuffer> bufferList_;
+ private long fileWrittenSize_;
+ private boolean closed;
+ SpilledDataProcessor processor;
+
+ ProcessSpilledDataThread(SpillWriteIndexStatus status,
+ List<ByteBuffer> bufferList, SpilledDataProcessor processor) {
+ status_ = status;
+ bufferList_ = bufferList;
+ closed = false;
+ this.processor = processor;
+ }
+
+ /**
+ * Keep writing to the file as the buffers gets available.
+ *
+ * @throws IOException when the thread is interrupted while waiting to get
+ * the index of the buffer to written to the file.
+ */
+ private void keepProcessingData() throws IOException {
+
+ int fileWriteIndex = -1;
+ do {
+
+ try {
+ fileWriteIndex = status_.getNextProcessorBufferIndex();
+ } catch (InterruptedException e1) {
+ throw new IOException(e1);
+ }
+ while (fileWriteIndex >= 0) {
+ ByteBuffer buffer = bufferList_.get(fileWriteIndex);
+ processor.handleSpilledBuffer(buffer);
+ buffer.clear();
+ try {
+ fileWriteIndex = status_.getNextProcessorBufferIndex();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted getting next index to process data.", e);
+ throw new IOException(e);
+ }
+
+ }
+ } while (!closed);
+
+
+ }
+
+ /*
+ * Indicate the thread that the spilling process is complete.
+ */
+ public void completeSpill() {
+ closed = true;
+ }
+
+ /**
+ * Gets the size of file written in bytes.
+ *
+ * @return the size of file written.
+ */
+ public long getFileWrittenSize() {
+ return fileWrittenSize_;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ keepProcessingData();
+ return Boolean.TRUE;
+ }
+
+ }
+
+ /**
+ * This class is responsible for holding the <code>ByteBuffer</code> arrays
+ * and writing data into the buffers. Once the the threshold is crossed it
+ * invokes a spilling thread that would spill the data from the buffer to the
+ * disk.
+ *
+ */
+ static class SpillingStream extends OutputStream {
+
+ final byte[] b;
+ final boolean direct_;
+
+ private List<ByteBuffer> bufferList_;
+ private int bufferSize_;
+ private BitSet bufferState_;
+ private int numberBuffers_;
+ private ByteBuffer currentBuffer_;
+ private long bytesWritten_;
+ private long bytesRemaining_;
+ private SpillWriteIndexStatus spillStatus_;
+ private int thresholdSize_;
+ private boolean startedSpilling_;
+ private ProcessSpilledDataThread spillThread_;
+ private ExecutorService spillThreadService_;
+ private Future<Boolean> spillThreadState_;
+ private boolean closed_;
+
+ private SpilledDataProcessor processor;
+ /**
+ * The internal buffer where data is stored.
+ */
+ protected byte buf[];
+
+ /**
+ * The number of valid bytes in the buffer. This value is always in the
+ * range <tt>0</tt> through <tt>buf.length</tt>; elements <tt>buf[0]</tt>
+ * through <tt>buf[count-1]</tt> contain valid byte data.
+ */
+ protected int count;
+
+ /**
+ * Default intermediate buffer size;
+ */
+ protected int defaultBufferSize_;
+
+ /**
+ *
+ * @param numBuffers The number of ByteBuffer the class should hold
+ * @param bufferSize The size of each ByteBuffer.
+ * @param threshold The threshold after which the spilling should start.
+ * @param direct true indicates the ByteBuffer be allocated direct
+ * @param fileName The name of the file where the spilled data should be
+ * written into.
+ */
+ SpillingStream(int numBuffers, int bufferSize, int threshold,
+ boolean direct, SpilledDataProcessor processor) {
+ this(numBuffers, bufferSize, threshold, direct, processor, 8192);
+
+ }
+
+ /**
+ *
+ * @param numBuffers The number of ByteBuffer the class should hold
+ * @param bufferSize The size of each ByteBuffer.
+ * @param threshold The threshold after which the spilling should start.
+ * @param direct true indicates the ByteBuffer be allocated direct
+ * @param fileName The name of the file where the spilled data should be
+ * written into.
+ */
+ SpillingStream(int numBuffers, int bufferSize, int threshold,
+ boolean direct, SpilledDataProcessor processor, int interBufferSize) {
+
+ assert (threshold >= bufferSize);
+ assert (threshold < numBuffers * bufferSize);
+ if(interBufferSize > bufferSize){
+ interBufferSize = bufferSize/2;
+ }
+ defaultBufferSize_ = interBufferSize;
+ this.b = new byte[1];
+ this.buf = new byte[defaultBufferSize_];
+ count = 0;
+ direct_ = direct;
+ numberBuffers_ = numBuffers;
+ bufferSize_ = bufferSize;
+ bufferList_ = new ArrayList<ByteBuffer>(numberBuffers_);
+ bufferState_ = new BitSet(numBuffers);
+
+ for (int i = 0; i < numBuffers / 2; ++i) {
+ if (direct_) {
+ bufferList_.add(ByteBuffer.allocateDirect(bufferSize_));
+ } else {
+ bufferList_.add(ByteBuffer.allocate(bufferSize_));
+ }
+ }
+ currentBuffer_ = bufferList_.get(0);
+ bytesWritten_ = 0L;
+ bytesRemaining_ = bufferSize_;
+ spillStatus_ = new SpillWriteIndexStatus(bufferSize, numberBuffers_, 0,
+ -1, bufferState_);
+ thresholdSize_ = threshold;
+ startedSpilling_ = false;
+ spillThread_ = null;
+ spillThreadState_ = null;
+ this.processor = processor;
+ closed_ = false;
+
+ }
+
+ public void clear() throws IOException{
+ this.close();
+ startedSpilling_ = false;
+ bufferState_.clear();
+
+ for (int i = 0; i < bufferList_.size(); ++i) {
+ bufferList_.get(i).clear();
+ }
+ currentBuffer_ = bufferList_.get(0);
+ bytesWritten_ = 0L;
+ bytesRemaining_ = bufferSize_;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (count < buf.length - 1) {
+ buf[count++] = (byte) (b & 0xFF);
+ return;
+ }
+
+ this.b[0] = (byte) (b & 0xFF);
+ write(this.b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Keep track of the data written to the buffer. If it exceeds the threshold
+ * start the spilling thread.
+ *
+ * @param len
+ * @throws InterruptedException
+ */
+ private void startSpilling() throws InterruptedException {
+ synchronized (this) {
+ spillThread_ = new ProcessSpilledDataThread(spillStatus_, bufferList_,
+ processor);
+ startedSpilling_ = true;
+ spillThreadService_ = Executors.newFixedThreadPool(1);
+ spillThreadState_ = spillThreadService_.submit(spillThread_);
+ spillStatus_.startSpilling();
+ }
+ // }
+ }
+
+ public void perfectFillWrite(byte[] b, int off, int len) throws IOException {
+ int rem = currentBuffer_.remaining();
+ while (len > rem) {
+ currentBuffer_.put(b, off, rem);
+ // bytesWritten_ += len;
+ // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
+ // try {
+ // startSpilling(rem);
+ // } catch (InterruptedException e) {
+ // throw new IOException("Internal error occured writing to buffer.",
+ // e);
+ // }
+ // }
+
+ currentBuffer_.flip();
+ int index = spillStatus_.getNextBufferIndex();
+ currentBuffer_ = getBuffer(index);
+ off += rem;
+ len -= rem;
+ rem = currentBuffer_.remaining();
+ }
+ currentBuffer_.put(b, off, len);
+ // bytesWritten_ += len;
+ // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
+ // try {
+ // startSpilling(len);
+ // } catch (InterruptedException e) {
+ // throw new IOException("Internal error occured writing to buffer.", e);
+ // }
+ // }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len >= buf.length) {
+ /*
+ * If the request length exceeds the size of the output buffer, flush
+ * the output buffer and then write the data directly. In this way
+ * buffered streams will cascade harmlessly.
+ */
+ flushBuffer();
+ writeInternal(b, off, len);
+ return;
+ }
+ if (len > buf.length - count) {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, buf, count, len);
+ count += len;
+ }
+
+ private void writeInternal(byte[] b, int off, int len) throws IOException {
+
+ bytesWritten_ += len;
+ if (bytesWritten_ >= thresholdSize_ && !startedSpilling_) {
+ try {
+ startSpilling();
+ } catch (InterruptedException e) {
+ throw new IOException("Internal error occured writing to buffer.", e);
+ }
+ }
+
+ if (len > bytesRemaining_) {
+ currentBuffer_.flip();
+ currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
+ bytesRemaining_ = bufferSize_;
+ }
+ currentBuffer_.put(b, off, len);
+ bytesRemaining_ -= len;
+
+ }
+
+ /** Flush the internal buffer */
+ private void flushBuffer() throws IOException {
+ if (count > 0) {
+ writeInternal(buf, 0, count);
+ count = 0;
+ }
+ }
+
+ /**
+ * Gets the ByteBuffer from the buffer list.
+ *
+ * @param index
+ * @return
+ * @throws IOException
+ */
+ ByteBuffer getBuffer(int index) throws IOException {
+
+ if (index >= bufferList_.size()) {
+ if (direct_) {
+ bufferList_.add(index, ByteBuffer.allocateDirect(bufferSize_));
+ } else {
+ bufferList_.add(index, ByteBuffer.allocate(bufferSize_));
+ }
+ }
+
+ return bufferList_.get(index);
+ }
+
+ /**
+ * Closes the spilling process.
+ */
+ public void flush() throws IOException {
+ flushBuffer();
+ flushInternal();
+ }
+
+ public void flushInternal() throws IOException {
+ if (closed_)
+ return;
+
+ currentBuffer_.flip();
+ spillStatus_.spillCompleted();
+ if (this.startedSpilling_) {
+ this.spillThread_.completeSpill();
+ boolean completionState = false;
+ try {
+ completionState = spillThreadState_.get();
+ if (!completionState) {
+ throw new IOException(
+ "Spilling Thread failed to complete sucessfully.");
+ }
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ } finally {
+ closed_ = true;
+ this.processor.close();
+ this.spillThreadService_.shutdownNow();
+ }
+
+ }
+ }
+
+ }
+
+ /**
+ * Initialize the spilling buffer with spilling file name
+ *
+ * @param fileName name of the file.
+ * @throws FileNotFoundException
+ */
+ public SpillingDataOutputBuffer(String fileName) throws FileNotFoundException {
+ super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
+ new WriteSpilledDataProcessor(fileName)));
+ }
+
+ public SpillingDataOutputBuffer(SpilledDataProcessor processor) throws FileNotFoundException {
+ super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
+ processor));
+ }
+
+
+
+ /**
+ * Initializes the spilling buffer.
+ * @throws FileNotFoundException
+ */
+ public SpillingDataOutputBuffer() throws FileNotFoundException {
+ super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
+ new WriteSpilledDataProcessor(
+ System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32))));
+ }
+
+ /**
+ *
+ * @param bufferCount
+ * @param bufferSize
+ * @param threshold
+ * @param direct
+ * @param fileName
+ */
+ public SpillingDataOutputBuffer(int bufferCount, int bufferSize, int threshold,
+ boolean direct, SpilledDataProcessor processor) {
+ super(new SpillingStream(bufferCount, bufferSize, threshold, direct,
+ processor));
+ }
+
+ public void clear() throws IOException{
+ SpillingStream stream = (SpillingStream) this.out;
+ stream.clear();
+ }
+
+ public boolean hasSpilled(){
+ return ((SpillingStream) this.out).startedSpilling_;
+ }
+
+ /**
+ * Provides an input stream to read from the spilling buffer.
+ *
+ * @throws IOException
+ */
+ public SpilledDataInputBuffer getInputStreamToRead(String fileName) throws IOException {
+
+ SpillingStream stream = (SpillingStream) this.out;
+ SpilledDataInputBuffer.SpilledInputStream inStream = new SpilledDataInputBuffer.SpilledInputStream(
+ fileName, stream.direct_, stream.bufferList_,
+ stream.startedSpilling_);
+ inStream.prepareRead();
+ return new SpilledDataInputBuffer(inStream);
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ *
+ *
+ */
+public class WriteSpilledDataProcessor implements SpilledDataProcessor {
+
+ private static final Log LOG = LogFactory
+ .getLog(WriteSpilledDataProcessor.class);
+
+ private FileChannel fileChannel;
+ private RandomAccessFile raf;
+ private String fileName;
+
+ public WriteSpilledDataProcessor(String fileName)
+ throws FileNotFoundException {
+ this.fileName = fileName;
+ raf = new RandomAccessFile(fileName, "rw");
+ fileChannel = raf.getChannel();
+ }
+
+ @Override
+ public boolean init(Configuration conf) {
+ return true;
+ }
+
+ @Override
+ public boolean handleSpilledBuffer(ByteBuffer buffer) {
+ try {
+ fileChannel.write(buffer);
+ return true;
+ } catch (IOException e) {
+ LOG.error("Error writing to file:"+fileName, e);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean close() {
+ try {
+ fileChannel.close();
+ } catch (IOException e) {
+ LOG.error("Error writing to file:" + fileName, e);
+ }
+ return true;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.io.PreFetchCache;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+/**
+ *
+ *
+ * @param <M>
+ */
+public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
+
+ private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
+
+ private Configuration conf;
+ private SpillingDataOutputBuffer spillOutputBuffer;
+ private int numMessagesWritten;
+ private int numMessagesRead;
+
+ public final static String SPILLBUFFER_COUNT = "hama.io.spillbuffer.count";
+ public final static String SPILLBUFFER_SIZE = "hama.io.spillbuffer.size";
+ public final static String SPILLBUFFER_FILENAME = "hama.io.spillbuffer.filename";
+ public final static String SPILLBUFFER_THRESHOLD = "hama.io.spillbuffer.threshold";
+ public final static String SPILLBUFFER_DIRECT = "hama.io.spillbuffer.direct";
+ public final static String ENABLE_PREFETCH = "hama.io.spillbuffer.enableprefetch";
+ public final static String SPILLBUFFER_MSGCLASS = "hama.io.spillbuffer.msgclass";
+ private int bufferCount;
+ private int bufferSize;
+ private String fileName;
+ private int threshold;
+ private boolean direct;
+ private SpilledDataInputBuffer spilledInput;
+ private boolean objectWritableMode;
+ private ObjectWritable objectWritable;
+
+ private Class<M> messageClass;
+ private PreFetchCache<M> prefetchCache;
+ private boolean enablePrefetch;
+
+
+ private class SpillIterator implements Iterator<M> {
+
+ private boolean objectMode;
+ private Class<M> classObject;
+ private M messageHolder;
+
+ public SpillIterator(boolean mode, Class<M> classObj, Configuration conf){
+ this.objectMode = mode;
+ this.classObject = classObj;
+ if(classObject != null){
+ messageHolder = ReflectionUtils.newInstance(classObj, conf);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return numMessagesRead != numMessagesWritten && numMessagesWritten > 0;
+ }
+
+ @Override
+ public M next() {
+ if(objectMode){
+ return poll();
+ }
+ else {
+ return poll(messageHolder);
+ }
+ }
+
+ @Override
+ public void remove() {
+ // do nothing
+ }
+
+ }
+
+ @Override
+ public Iterator<M> iterator() {
+ return new SpillIterator(objectWritableMode, messageClass, conf);
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ this.objectWritable.setConf(conf);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void add(M msg) {
+ try {
+ if (objectWritableMode) {
+ objectWritable.set(msg);
+ objectWritable.write(spillOutputBuffer);
+ } else {
+ msg.write(spillOutputBuffer);
+ }
+ ++numMessagesWritten;
+ } catch (IOException e) {
+ LOG.error("Error adding message.", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void addAll(Collection<M> msgs) {
+ for (M msg : msgs) {
+ add(msg);
+ }
+
+ }
+
+ @Override
+ public void addAll(MessageQueue<M> arg0) {
+ Iterator<M> iter = arg0.iterator();
+ while (iter.hasNext()) {
+ add(iter.next());
+ }
+ }
+
+ @Override
+ public void clear() {
+ try {
+ spillOutputBuffer.close();
+ spillOutputBuffer.clear();
+ spilledInput.close();
+ spilledInput.clear();
+ } catch (IOException e) {
+ LOG.error("Error clearing spill stream.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ try {
+ this.spillOutputBuffer.close();
+ this.spilledInput.close();
+ this.spilledInput.completeReading(true);
+ } catch (IOException e) {
+ LOG.error("Error closing the spilled input stream.", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(Configuration conf, TaskAttemptID arg1) {
+
+ bufferCount = conf.getInt(SPILLBUFFER_COUNT, 3);
+ bufferSize = conf.getInt(SPILLBUFFER_SIZE, 16 * 1024);
+ direct = conf.getBoolean(SPILLBUFFER_DIRECT, true);
+ threshold = conf.getInt(SPILLBUFFER_THRESHOLD, 16 * 1024);
+ fileName = conf.get(SPILLBUFFER_FILENAME,
+ System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32));
+
+ messageClass = (Class<M>) conf.getClass(SPILLBUFFER_MSGCLASS, null);
+ objectWritableMode = messageClass == null;
+
+ SpilledDataProcessor processor;
+ try {
+ processor = new WriteSpilledDataProcessor(fileName);
+ } catch (FileNotFoundException e) {
+ LOG.error("Error initializing spilled data stream.", e);
+ throw new RuntimeException(e);
+ }
+ spillOutputBuffer = new SpillingDataOutputBuffer(bufferCount, bufferSize,
+ threshold, direct, processor);
+ objectWritable = new ObjectWritable();
+ objectWritable.setConf(conf);
+ this.conf = conf;
+ }
+
+ private void incReadMsgCount(){
+ ++numMessagesRead;
+ }
+
+ @SuppressWarnings("unchecked")
+ private M readDirect(M msg){
+ if(numMessagesRead >= numMessagesWritten){
+ return null;
+ }
+ try {
+ if (objectWritableMode) {
+ objectWritable.readFields(spilledInput);
+ incReadMsgCount();
+ return (M) objectWritable.get();
+ } else {
+ msg.readFields(spilledInput);
+ incReadMsgCount();
+ return msg;
+ }
+ } catch (IOException e) {
+ LOG.error("Error getting values from spilled input", e);
+ }
+ return null;
+ }
+
+ public M poll(M msg) {
+ if(numMessagesRead >= numMessagesWritten){
+ return null;
+ }
+ if(enablePrefetch){
+ return readFromPrefetch(msg);
+ }
+ else {
+ return readDirect(msg);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private M readDirectObjectWritable(){
+ if (!objectWritableMode) {
+ throw new IllegalStateException(
+ "API call not supported. Set the configuration property "
+ + "'hama.io.spillbuffer.newmsginit' to true.");
+ }
+ try {
+ objectWritable.readFields(spilledInput);
+ incReadMsgCount();
+ } catch (IOException e) {
+ LOG.error("Error getting values from spilled input", e);
+ return null;
+ }
+ return (M) objectWritable.get();
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private M readFromPrefetch(M msg){
+ if(objectWritableMode){
+ this.objectWritable = (ObjectWritable) prefetchCache.get();
+ incReadMsgCount();
+ return (M)this.objectWritable.get();
+ }
+ else {
+ incReadMsgCount();
+ return (M)this.prefetchCache.get();
+ }
+
+ }
+
+ @Override
+ public M poll() {
+ if(numMessagesRead >= numMessagesWritten){
+ return null;
+ }
+
+ if(enablePrefetch){
+ M msg = readFromPrefetch(null);
+ if(msg != null)
+ incReadMsgCount();
+ return msg;
+ }
+ else {
+ return readDirectObjectWritable();
+ }
+ }
+
+ @Override
+ public void prepareRead() {
+ try {
+ spillOutputBuffer.close();
+ } catch (IOException e) {
+ LOG.error("Error closing spilled buffer", e);
+ throw new RuntimeException(e);
+ }
+ try {
+ spilledInput = spillOutputBuffer.getInputStreamToRead(fileName);
+ } catch (IOException e) {
+ LOG.error("Error initializing the input spilled stream", e);
+ throw new RuntimeException(e);
+ }
+ if(conf.getBoolean(ENABLE_PREFETCH, false)){
+ this.prefetchCache = new PreFetchCache<M>(numMessagesWritten);
+ this.enablePrefetch = true;
+ try {
+ this.prefetchCache.startFetching(this.messageClass, spilledInput, conf);
+ } catch (InterruptedException e) {
+ LOG.error("Error starting prefetch on message queue.", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void prepareWrite() {
+ numMessagesWritten = 0;
+ }
+
+ @Override
+ public int size() {
+ return numMessagesWritten;
+ }
+
+}
Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.EOFException;
+import java.io.File;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
+
+import junit.framework.TestCase;
+
+public class TestMessageIO extends TestCase {
+
+ public void testNonSpillBuffer() throws Exception {
+
+ SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer();
+ Text text = new Text("Testing the spillage of spilling buffer");
+
+ for (int i = 0; i < 100; ++i) {
+ text.write(outputBuffer);
+ }
+
+ assertTrue(outputBuffer != null);
+ assertTrue(outputBuffer.size() == 4000);
+ assertFalse(outputBuffer.hasSpilled());
+
+ outputBuffer.close();
+
+ }
+
+ public void testSpillBuffer() throws Exception {
+
+ String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+ SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+ 1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+ Text text = new Text("Testing the spillage of spilling buffer");
+
+ for (int i = 0; i < 100; ++i) {
+ text.write(outputBuffer);
+ }
+
+ assertTrue(outputBuffer != null);
+ assertTrue(outputBuffer.size() == 4000);
+ assertTrue(outputBuffer.hasSpilled());
+ File f = new File(fileName);
+ assertTrue(f.exists());
+ assertTrue(f.delete());
+ outputBuffer.close();
+
+ }
+
+ public void testSpillInputStream() throws Exception {
+ String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+ SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+ 1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+ Text text = new Text("Testing the spillage of spilling buffer");
+
+ for (int i = 0; i < 100; ++i) {
+ text.write(outputBuffer);
+ }
+
+ assertTrue(outputBuffer != null);
+ assertTrue(outputBuffer.size() == 4000);
+ assertTrue(outputBuffer.hasSpilled());
+ File f = new File(fileName);
+ assertTrue(f.exists());
+ outputBuffer.close();
+ assertTrue(f.length() == 4000L);
+
+ SpilledDataInputBuffer inputBuffer = outputBuffer
+ .getInputStreamToRead(fileName);
+
+ for (int i = 0; i < 100; ++i) {
+ text.readFields(inputBuffer);
+ assertTrue("Testing the spillage of spilling buffer".equals(text
+ .toString()));
+ text.clear();
+ }
+
+ try {
+ text.readFields(inputBuffer);
+ assertTrue(false);
+ } catch (EOFException eof) {
+ assertTrue(true);
+ }
+
+ inputBuffer.close();
+ inputBuffer.completeReading(false);
+ assertTrue(f.exists());
+ inputBuffer.completeReading(true);
+ assertFalse(f.exists());
+
+ }
+
+}
Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java?rev=1432734&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java Sun Jan 13 20:52:03 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.File;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.message.queue.SpillingQueue;
+
+public class TestSpillingQueue extends TestCase {
+
+ /**
+ * Test the spilling queue where the message class is specified.
+ * @throws Exception
+ */
+ public void testTextSpillingQueue() throws Exception {
+
+ String msg = "Testing the spillage of spilling buffer";
+ Text text = new Text(msg);
+ TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
+ SpillingQueue<Text> queue = new SpillingQueue<Text>();
+ Configuration conf = new HamaConfiguration();
+
+ String fileName =
+ System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+ File file = new File(fileName);
+ conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
+ conf.setClass(SpillingQueue.SPILLBUFFER_MSGCLASS, Text.class,
+ Writable.class);
+ queue.init(conf, id);
+ queue.prepareWrite();
+ for(int i = 0; i < 1000; ++i){
+ queue.add(text);
+ }
+ queue.prepareRead();
+ for(Text t: queue){
+ assertTrue(msg.equals(t.toString()));
+ text.clear();
+ }
+
+ assertTrue(queue.poll() == null);
+
+ assertTrue(file.exists());
+ queue.close();
+ assertFalse(file.exists());
+ }
+
+ /**
+ * Test the spilling queue where the message class is not specified and the
+ * queue uses ObjectWritable to store messages.
+ * @throws Exception
+ */
+ public void testObjectWritableSpillingQueue() throws Exception {
+
+ String msg = "Testing the spillage of spilling buffer";
+ Text text = new Text(msg);
+ TaskAttemptID id = new TaskAttemptID(new TaskID("123", 1, 2), 0);
+ SpillingQueue<Text> queue = new SpillingQueue<Text>();
+ Configuration conf = new HamaConfiguration();
+
+ String fileName =
+ System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+ File file = new File(fileName);
+ conf.set(SpillingQueue.SPILLBUFFER_FILENAME, fileName);
+ queue.init(conf, id);
+ queue.prepareWrite();
+ for(int i = 0; i < 1000; ++i){
+ queue.add(text);
+ }
+ queue.prepareRead();
+ for(Text t: queue){
+ assertTrue(msg.equals(t.toString()));
+ text.clear();
+ }
+
+ assertTrue(queue.poll() == null);
+
+ assertTrue(file.exists());
+ queue.close();
+ assertFalse(file.exists());
+ }
+
+}