You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2013/02/06 15:08:27 UTC
svn commit: r1442969 - in /hama/trunk/core/src: main/java/org/apache/hama/
main/java/org/apache/hama/bsp/message/io/
main/java/org/apache/hama/bsp/message/queue/
test/java/org/apache/hama/bsp/message/
Author: tommaso
Date: Wed Feb 6 14:08:26 2013
New Revision: 1442969
URL: http://svn.apache.org/viewvc?rev=1442969&view=rev
Log:
HAMA-721 - committed Suraj's patch
Modified:
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Wed Feb 6 14:08:26 2013
@@ -64,12 +64,12 @@ public interface Constants {
public static final String MAX_TASK_ATTEMPTS = "bsp.tasks.max.attempts";
public static final String MAX_TASKS_PER_JOB = "bsp.max.tasks.per.job";
-
+
public static final String COMBINER_CLASS = "bsp.combiner.class";
public static final int DEFAULT_MAX_TASK_ATTEMPTS = 2;
-
- ////////////////////////////////////////
+
+ // //////////////////////////////////////
// Task scheduler related constants
// //////////////////////////////////////
@@ -97,11 +97,20 @@ public interface Constants {
// /////////////////////////////////////////////
// Job configuration related parameters.
// /////////////////////////////////////////////
- public static final String JOB_INPUT_DIR = "bsp.input.dir";
- public static final String JOB_PEERS_COUNT = "bsp.peers.num";
- public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
+ public static final String JOB_INPUT_DIR = "bsp.input.dir";
+ public static final String JOB_PEERS_COUNT = "bsp.peers.num";
+ public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
-
+ public static final String MESSAGE_CLASS = "bsp.message.type.class";
+
+ // /////////////////////////////////////////////
+ // Messaging related parameters.
+ // /////////////////////////////////////////////
+ public static final int BUFFER_DEFAULT_SIZE = 16 * 1024;
+ public static final String BYTEBUFFER_SIZE = "bsp.message.bytebuffer.size";
+ public static final String BYTEBUFFER_DIRECT = "bsp.message.bytebuffer.direct";
+ public static final boolean BYTEBUFFER_DIRECT_DEFAULT = true;
+ public static final String DATA_SPILL_PATH = "bsp.data.spill.location";
// /////////////////////////////////////////////
// Constants related to partitioning
@@ -112,7 +121,6 @@ public interface Constants {
public static final String RUNTIME_DESIRED_PEERS_COUNT = "desired.num.of.tasks";
public static final String RUNTIME_PARTITION_RECORDCONVERTER = "bsp.runtime.partition.recordconverter";
-
// /////////////////////////////////////
// Constants for ZooKeeper
// /////////////////////////////////////
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/BufferedReadStatus.java Wed Feb 6 14:08:26 2013
@@ -50,7 +50,8 @@ class BufferReadStatus extends ReadIndex
}
@Override
- public void startReading() {
+ public boolean startReading() {
+ return true;
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/PreFetchCache.java Wed Feb 6 14:08:26 2013
@@ -142,12 +142,14 @@ public class PreFetchCache<M extends Wri
public void startFetching(Class<M> classObject,
SpilledDataInputBuffer buffer, Configuration conf)
- throws InterruptedException {
+ throws InterruptedException, IOException {
preFetchThread = new PreFetchThread<M>(classObject, objectListArr,
capacity, buffer, totalMessages, status, conf);
preFetchThread.start();
- status.startReading();
+ if(!status.startReading()){
+ throw new IOException("Failed to start reading the spilled file: ");
+ }
arrIndex = status.getReadBufferIndex();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReadIndexStatus.java Wed Feb 6 14:08:26 2013
@@ -52,7 +52,7 @@ abstract class ReadIndexStatus {
/**
* Indicate to start reading.
*/
- public abstract void startReading();
+ public abstract boolean startReading();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillWriteIndexStatus.java Wed Feb 6 14:08:26 2013
@@ -22,8 +22,8 @@ import java.util.BitSet;
/**
* This class provides a thread-safe interface for both the spilling thread and
- * the thread that is writing into <code>SpillingBuffer</code>. It stores the
- * state and manipulates the next available buffer for both the threads.
+ * the thread that is writing into {@link SpillingDataOutputBuffer}. It stores
+ * the state and manipulates the next available buffer for both the threads.
*/
class SpillWriteIndexStatus {
@@ -33,6 +33,7 @@ class SpillWriteIndexStatus {
private volatile boolean spillStart;
private int numBuffers;
private volatile BitSet bufferBitState;
+ private volatile boolean errorState;
SpillWriteIndexStatus(int size, int bufferCount, int bufferIndex,
int fileWriteIndex, BitSet bufferBitState) {
@@ -42,6 +43,7 @@ class SpillWriteIndexStatus {
processorBufferIndex = fileWriteIndex;
numBuffers = bufferCount;
this.bufferBitState = bufferBitState;
+ errorState = false;
}
/**
@@ -57,14 +59,23 @@ class SpillWriteIndexStatus {
bufferBitState.set(bufferListWriteIndex, true);
notify();
bufferListWriteIndex = (bufferListWriteIndex + 1) % numBuffers;
- while (bufferBitState.get(bufferListWriteIndex)) {
+ while (bufferBitState.get(bufferListWriteIndex) && !errorState) {
try {
wait();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
- return bufferListWriteIndex;
+ return checkError(bufferListWriteIndex);
+ }
+
+ private int checkError(int index) {
+ return errorState ? -1 : index;
+ }
+
+ public void notifyError() {
+ errorState = true;
+ notify();
}
/**
@@ -90,15 +101,16 @@ class SpillWriteIndexStatus {
notify();
}
processorBufferIndex = (processorBufferIndex + 1) % numBuffers;
- while (!bufferBitState.get(processorBufferIndex) && !spillComplete) {
+ while (!bufferBitState.get(processorBufferIndex) && !spillComplete
+ && !errorState) {
wait();
}
// Is the last buffer written to file after the spilling is complete?
// then complete the operation.
- if (spillComplete && bufferBitState.isEmpty()) {
+ if ((spillComplete && bufferBitState.isEmpty()) || errorState) {
return -1;
}
- return processorBufferIndex;
+ return checkError(processorBufferIndex);
}
/**
@@ -107,12 +119,14 @@ class SpillWriteIndexStatus {
* spilled.
*
* @throws InterruptedException
+ * @return whether the spill started correctly
*/
- public synchronized void startSpilling() throws InterruptedException {
+ public synchronized boolean startSpilling() throws InterruptedException {
- while (!spillStart) {
+ while (!spillStart && !errorState) {
wait();
}
+ return !errorState;
}
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataInputBuffer.java Wed Feb 6 14:08:26 2013
@@ -23,10 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
-import java.nio.channels.FileChannel.MapMode;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.Callable;
@@ -35,10 +32,8 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.hama.bsp.message.io.BufferReadStatus;
-import org.apache.hama.bsp.message.io.ReadIndexStatus;
-import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
-import org.apache.hama.bsp.message.io.SpilledDataReadStatus;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* <code>SpilledDataInputBuffer</code> class is designed to read from the
@@ -49,6 +44,8 @@ import org.apache.hama.bsp.message.io.Sp
*/
public class SpilledDataInputBuffer extends DataInputStream implements
DataInput {
+ private static final Log LOG = LogFactory
+ .getLog(SpilledDataInputBuffer.class);
/**
* The thread is used to asynchronously read from the spilled file and load
@@ -57,7 +54,7 @@ public class SpilledDataInputBuffer exte
static class SpillReadThread implements Callable<Boolean> {
private String fileName;
- private List<ByteBuffer> bufferList_;
+ private List<SpilledByteBuffer> bufferList_;
private long bytesToRead_;
private long bytesWrittenInFile_;
private SpilledDataReadStatus status_;
@@ -73,7 +70,7 @@ public class SpilledDataInputBuffer exte
* @param status The shared object that synchronizes the indexes for buffer
* to fill the data with.
*/
- public SpillReadThread(String fileName, List<ByteBuffer> bufferList,
+ public SpillReadThread(String fileName, List<SpilledByteBuffer> bufferList,
SpilledDataReadStatus status) {
this.fileName = fileName;
bufferList_ = bufferList;
@@ -92,7 +89,6 @@ public class SpilledDataInputBuffer exte
FileChannel fc = raf.getChannel();
bytesToRead_ = fc.size();
bytesWrittenInFile_ = bytesToRead_;
- MappedByteBuffer mBuffer = null;
long fileReadPos = 0;
int fileReadIndex = -1;
do {
@@ -105,13 +101,14 @@ public class SpilledDataInputBuffer exte
if (fileReadIndex < 0)
break;
- ByteBuffer buffer = bufferList_.get(fileReadIndex);
+ SpilledByteBuffer buffer = bufferList_.get(fileReadIndex);
buffer.clear();
long readSize = Math.min(buffer.remaining(),
(bytesWrittenInFile_ - fileReadPos));
-
- mBuffer = fc.map(MapMode.READ_ONLY, fileReadPos, readSize);
- buffer.put(mBuffer);
+ readSize = fc.read(buffer.getByteBuffer());
+ if (readSize < 0) {
+ break;
+ }
buffer.flip();
bytesToRead_ -= readSize;
fileReadPos += readSize;
@@ -136,7 +133,13 @@ public class SpilledDataInputBuffer exte
@Override
public Boolean call() throws Exception {
- keepReadingFromFile();
+ try {
+ keepReadingFromFile();
+ } catch (Exception e) {
+ LOG.error("Error reading from file: " + fileName, e);
+ status_.notifyError();
+ return Boolean.FALSE;
+ }
return Boolean.TRUE;
}
@@ -152,7 +155,7 @@ public class SpilledDataInputBuffer exte
static class SpilledInputStream extends InputStream {
private String fileName_;
- private List<ByteBuffer> bufferList_;
+ private List<SpilledByteBuffer> bufferList_;
private boolean spilledAlready_;
ReadIndexStatus status_;
private final byte[] readByte = new byte[1];
@@ -162,13 +165,14 @@ public class SpilledDataInputBuffer exte
private Callable<Boolean> spillReadThread_;
private Future<Boolean> spillReadState_;
private ExecutorService spillThreadService_;
- private ByteBuffer currentReadBuffer_;
+ private SpilledByteBuffer currentReadBuffer_;
private BitSet bufferBitState_;
private boolean closed_;
public SpilledInputStream(String fileName, boolean direct,
- List<ByteBuffer> bufferList, boolean hasSpilled) throws IOException {
+ List<SpilledByteBuffer> bufferList, boolean hasSpilled)
+ throws IOException {
fileName_ = fileName;
bufferList_ = bufferList;
spilledAlready_ = hasSpilled;
@@ -193,7 +197,9 @@ public class SpilledDataInputBuffer exte
(SpilledDataReadStatus) status_);
spillThreadService_ = Executors.newFixedThreadPool(1);
spillReadState_ = spillThreadService_.submit(spillReadThread_);
- status_.startReading();
+ if (!status_.startReading()) {
+ throw new IOException("Failed to read the spilled file: " + fileName_);
+ }
}
try {
currentReadBuffer_ = getNextBuffer();
@@ -217,7 +223,7 @@ public class SpilledDataInputBuffer exte
}
}
- public ByteBuffer getNextBuffer() throws InterruptedException {
+ public SpilledByteBuffer getNextBuffer() throws InterruptedException {
int index = status_.getReadBufferIndex();
if (index >= 0 && index < bufferList_.size()) {
return bufferList_.get(index);
@@ -362,7 +368,7 @@ public class SpilledDataInputBuffer exte
}
public static SpilledDataInputBuffer getSpilledDataInputBuffer(
- String fileName, boolean direct, List<ByteBuffer> bufferList)
+ String fileName, boolean direct, List<SpilledByteBuffer> bufferList)
throws IOException {
SpilledInputStream inStream = new SpilledInputStream(fileName, direct,
bufferList, true);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataProcessor.java Wed Feb 6 14:08:26 2013
@@ -17,33 +17,34 @@
*/
package org.apache.hama.bsp.message.io;
-import java.nio.ByteBuffer;
-
import org.apache.hadoop.conf.Configuration;
/**
- * Base interface defining the behaviour to process the spilled data provided
- * in a byte buffer.
+ * Base interface defining the behaviour to process the spilled data provided in
+ * a byte buffer.
*/
public interface SpilledDataProcessor {
/**
* Initialize the data processor.
+ *
* @param conf
* @return true if no errors.
*/
boolean init(Configuration conf);
-
+
/**
* Override the method to define the action to be taken on the spilled data
* provided in the byte buffer.
+ *
* @param buffer
* @return true if no errors.
*/
- boolean handleSpilledBuffer(ByteBuffer buffer);
-
+ boolean handleSpilledBuffer(SpilledByteBuffer buffer);
+
/**
* Close the data processor.
+ *
* @return true if no errors.
*/
boolean close();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledDataReadStatus.java Wed Feb 6 14:08:26 2013
@@ -38,6 +38,7 @@ class SpilledDataReadStatus extends Read
private volatile boolean fileReadComplete_;
private volatile boolean bufferReadComplete_;
private volatile BitSet bufferBitState_;
+ private volatile boolean errorState_;
public SpilledDataReadStatus(int totalSize, BitSet bufferBitState) {
readBufferIndex_ = -1;
@@ -47,8 +48,22 @@ class SpilledDataReadStatus extends Read
bufferReadComplete_ = false;
totalSize_ = totalSize;
bufferBitState_ = bufferBitState;
+ errorState_ = false;
}
+ private int checkError(int index) {
+ if (errorState_) {
+ return -1;
+ } else {
+ return index;
+ }
+ }
+
+ public void notifyError() {
+ errorState_ = true;
+ notify();
+ }
+
@Override
public synchronized int getReadBufferIndex() throws InterruptedException {
@@ -59,14 +74,14 @@ class SpilledDataReadStatus extends Read
notify();
}
readBufferIndex_ = (readBufferIndex_ + 1) % totalSize_;
- while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_) {
+ while (!bufferBitState_.get(readBufferIndex_) && !fileReadComplete_ && !errorState_) {
wait();
}
// The file is completely read and transferred to buffers already.
if (bufferBitState_.isEmpty() && fileReadComplete_) {
return -1;
}
- return readBufferIndex_;
+ return checkError(readBufferIndex_);
}
@Override
@@ -91,7 +106,7 @@ class SpilledDataReadStatus extends Read
fetchFileBufferIndex_ = (fetchFileBufferIndex_ + 1) % totalSize_;
while (bufferBitState_.get(fetchFileBufferIndex_)
- && !bufferReadComplete_) {
+ && !bufferReadComplete_ && !errorState_) {
wait();
}
@@ -99,7 +114,7 @@ class SpilledDataReadStatus extends Read
return -1;
}
- return fetchFileBufferIndex_;
+ return checkError(fetchFileBufferIndex_);
}
@Override
@@ -121,14 +136,16 @@ class SpilledDataReadStatus extends Read
}
@Override
- public synchronized void startReading() {
- while (!spilledReadStart_)
+ public synchronized boolean startReading() {
+ while (!spilledReadStart_ && !errorState_) {
try {
wait();
} catch (InterruptedException e) {
LOG.error("Interrupted waiting to read the spilled file.", e);
throw new RuntimeException(e);
}
+ }
+ return !errorState_;
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Wed Feb 6 14:08:26 2013
@@ -23,7 +23,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigInteger;
-import java.nio.ByteBuffer;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.BitSet;
@@ -36,11 +35,6 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hama.bsp.message.io.SpillWriteIndexStatus;
-import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
-import org.apache.hama.bsp.message.io.SpilledDataProcessor;
-import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
-import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
/**
* <code>SpillingBuffer</code> is an output stream comprised of byte arrays that
@@ -56,7 +50,8 @@ import org.apache.hama.bsp.message.io.Wr
*/
public class SpillingDataOutputBuffer extends DataOutputStream {
- private static final Log LOG = LogFactory.getLog(SpillingDataOutputBuffer.class);
+ private static final Log LOG = LogFactory
+ .getLog(SpillingDataOutputBuffer.class);
/**
* This thread is responsible for writing from the ByteBuffers in the list to
@@ -64,13 +59,13 @@ public class SpillingDataOutputBuffer ex
*/
static class ProcessSpilledDataThread implements Callable<Boolean> {
private SpillWriteIndexStatus status_;
- private List<ByteBuffer> bufferList_;
+ private List<SpilledByteBuffer> bufferList_;
private long fileWrittenSize_;
private boolean closed;
SpilledDataProcessor processor;
ProcessSpilledDataThread(SpillWriteIndexStatus status,
- List<ByteBuffer> bufferList, SpilledDataProcessor processor) {
+ List<SpilledByteBuffer> bufferList, SpilledDataProcessor processor) {
status_ = status;
bufferList_ = bufferList;
closed = false;
@@ -86,6 +81,7 @@ public class SpillingDataOutputBuffer ex
private void keepProcessingData() throws IOException {
int fileWriteIndex = -1;
+
do {
try {
@@ -94,7 +90,7 @@ public class SpillingDataOutputBuffer ex
throw new IOException(e1);
}
while (fileWriteIndex >= 0) {
- ByteBuffer buffer = bufferList_.get(fileWriteIndex);
+ SpilledByteBuffer buffer = bufferList_.get(fileWriteIndex);
processor.handleSpilledBuffer(buffer);
buffer.clear();
try {
@@ -107,6 +103,9 @@ public class SpillingDataOutputBuffer ex
}
} while (!closed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done handling spilling data.");
+ }
}
@@ -128,7 +127,13 @@ public class SpillingDataOutputBuffer ex
@Override
public Boolean call() throws Exception {
- keepProcessingData();
+ try {
+ keepProcessingData();
+ } catch (Exception e) {
+ LOG.error("Error handling spilled data.", e);
+ status_.notifyError();
+ return Boolean.FALSE;
+ }
return Boolean.TRUE;
}
@@ -146,12 +151,13 @@ public class SpillingDataOutputBuffer ex
final byte[] b;
final boolean direct_;
- private List<ByteBuffer> bufferList_;
+ private List<SpilledByteBuffer> bufferList_;
private int bufferSize_;
private BitSet bufferState_;
private int numberBuffers_;
- private ByteBuffer currentBuffer_;
- private long bytesWritten_;
+ private SpilledByteBuffer currentBuffer_;
+ protected long bytesWritten_;
+ protected long bytesWrittenToBuffer;
private long bytesRemaining_;
private SpillWriteIndexStatus spillStatus_;
private int thresholdSize_;
@@ -159,8 +165,9 @@ public class SpillingDataOutputBuffer ex
private ProcessSpilledDataThread spillThread_;
private ExecutorService spillThreadService_;
private Future<Boolean> spillThreadState_;
- private boolean closed_;
+ private boolean closed_;;
+ private int interBufferEndOfRecord;
private SpilledDataProcessor processor;
/**
* The internal buffer where data is stored.
@@ -208,8 +215,8 @@ public class SpillingDataOutputBuffer ex
assert (threshold >= bufferSize);
assert (threshold < numBuffers * bufferSize);
- if(interBufferSize > bufferSize){
- interBufferSize = bufferSize/2;
+ if (interBufferSize > bufferSize) {
+ interBufferSize = bufferSize / 2;
}
defaultBufferSize_ = interBufferSize;
this.b = new byte[1];
@@ -218,15 +225,11 @@ public class SpillingDataOutputBuffer ex
direct_ = direct;
numberBuffers_ = numBuffers;
bufferSize_ = bufferSize;
- bufferList_ = new ArrayList<ByteBuffer>(numberBuffers_);
+ bufferList_ = new ArrayList<SpilledByteBuffer>(numberBuffers_);
bufferState_ = new BitSet(numBuffers);
for (int i = 0; i < numBuffers / 2; ++i) {
- if (direct_) {
- bufferList_.add(ByteBuffer.allocateDirect(bufferSize_));
- } else {
- bufferList_.add(ByteBuffer.allocate(bufferSize_));
- }
+ bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_));
}
currentBuffer_ = bufferList_.get(0);
bytesWritten_ = 0L;
@@ -241,13 +244,19 @@ public class SpillingDataOutputBuffer ex
closed_ = false;
}
-
- public void clear() throws IOException{
+
+ public void markEndOfRecord() {
+ interBufferEndOfRecord = (int) (this.bytesWrittenToBuffer + count);
+ if (currentBuffer_.capacity() > interBufferEndOfRecord)
+ this.currentBuffer_.markEndOfRecord(interBufferEndOfRecord);
+ }
+
+ public void clear() throws IOException {
this.close();
startedSpilling_ = false;
bufferState_.clear();
- for (ByteBuffer aBufferList_ : bufferList_) {
+ for (SpilledByteBuffer aBufferList_ : bufferList_) {
aBufferList_.clear();
}
currentBuffer_ = bufferList_.get(0);
@@ -261,7 +270,7 @@ public class SpillingDataOutputBuffer ex
buf[count++] = (byte) (b & 0xFF);
return;
}
-
+
this.b[0] = (byte) (b & 0xFF);
write(this.b);
}
@@ -277,49 +286,19 @@ public class SpillingDataOutputBuffer ex
*
* @param len
* @throws InterruptedException
+ * @throws IOException
*/
- private void startSpilling() throws InterruptedException {
+ private void startSpilling() throws InterruptedException, IOException {
synchronized (this) {
spillThread_ = new ProcessSpilledDataThread(spillStatus_, bufferList_,
processor);
startedSpilling_ = true;
spillThreadService_ = Executors.newFixedThreadPool(1);
spillThreadState_ = spillThreadService_.submit(spillThread_);
- spillStatus_.startSpilling();
- }
- // }
- }
-
- public void perfectFillWrite(byte[] b, int off, int len) throws IOException {
- int rem = currentBuffer_.remaining();
- while (len > rem) {
- currentBuffer_.put(b, off, rem);
- // bytesWritten_ += len;
- // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
- // try {
- // startSpilling(rem);
- // } catch (InterruptedException e) {
- // throw new IOException("Internal error occured writing to buffer.",
- // e);
- // }
- // }
-
- currentBuffer_.flip();
- int index = spillStatus_.getNextBufferIndex();
- currentBuffer_ = getBuffer(index);
- off += rem;
- len -= rem;
- rem = currentBuffer_.remaining();
+ if (!spillStatus_.startSpilling()) {
+ throw new IOException("Could not start spilling on disk.");
+ }
}
- currentBuffer_.put(b, off, len);
- // bytesWritten_ += len;
- // if (bytesWritten_ > thresholdSize_ && !startedSpilling_) {
- // try {
- // startSpilling(len);
- // } catch (InterruptedException e) {
- // throw new IOException("Internal error occured writing to buffer.", e);
- // }
- // }
}
@Override
@@ -341,10 +320,11 @@ public class SpillingDataOutputBuffer ex
count += len;
}
- private void writeInternal(byte[] b, int off, int len) throws IOException {
+ @SuppressWarnings("unused")
+ private void writeInternalImperfect(byte[] b, int off, int len)
+ throws IOException {
- bytesWritten_ += len;
- if (bytesWritten_ >= thresholdSize_ && !startedSpilling_) {
+ if (!startedSpilling_ && bytesWritten_ >= thresholdSize_) {
try {
startSpilling();
} catch (InterruptedException e) {
@@ -356,10 +336,50 @@ public class SpillingDataOutputBuffer ex
currentBuffer_.flip();
currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
bytesRemaining_ = bufferSize_;
+ this.bytesWrittenToBuffer = bytesWritten_;
+ }
+ currentBuffer_.put(b, off, len);
+ bytesRemaining_ -= len;
+ bytesWritten_ += len;
+
+ }
+
+ public void writeInternal(byte[] b, int off, int len) throws IOException {
+ int rem = currentBuffer_.remaining();
+ while (len > rem) {
+ currentBuffer_.put(b, off, rem);
+ bytesWritten_ += rem;
+ if (!startedSpilling_) {
+ checkSpillStart();
+ }
+ currentBuffer_.flip();
+ currentBuffer_ = getBuffer(spillStatus_.getNextBufferIndex());
+ if (currentBuffer_ == null)
+ throw new IOException(
+ "Error writing to spilling buffer. Could not get free buffer.");
+ bytesRemaining_ = bufferSize_;
+ this.bytesWrittenToBuffer = 0;
+ off += rem;
+ len -= rem;
+ rem = currentBuffer_.remaining();
}
currentBuffer_.put(b, off, len);
+ bytesWritten_ += len;
bytesRemaining_ -= len;
+ if (!startedSpilling_) {
+ checkSpillStart();
+ }
+ this.bytesWrittenToBuffer += len;
+ }
+ private void checkSpillStart() throws IOException {
+ if (bytesWritten_ >= thresholdSize_) {
+ try {
+ startSpilling();
+ } catch (InterruptedException e) {
+ throw new IOException("Internal error occured writing to buffer.", e);
+ }
+ }
}
/** Flush the internal buffer */
@@ -377,16 +397,13 @@ public class SpillingDataOutputBuffer ex
* @return
* @throws IOException
*/
- ByteBuffer getBuffer(int index) throws IOException {
-
+ SpilledByteBuffer getBuffer(int index) throws IOException {
+ if(index < 0){
+ return null;
+ }
if (index >= bufferList_.size()) {
- if (direct_) {
- bufferList_.add(index, ByteBuffer.allocateDirect(bufferSize_));
- } else {
- bufferList_.add(index, ByteBuffer.allocate(bufferSize_));
- }
+ bufferList_.add(new SpilledByteBuffer(direct_, bufferSize_));
}
-
return bufferList_.get(index);
}
@@ -422,7 +439,7 @@ public class SpillingDataOutputBuffer ex
this.processor.close();
this.spillThreadService_.shutdownNow();
}
-
+
}
}
@@ -432,28 +449,27 @@ public class SpillingDataOutputBuffer ex
* Initialize the spilling buffer with spilling file name
*
* @param fileName name of the file.
- * @throws FileNotFoundException
+ * @throws FileNotFoundException
*/
public SpillingDataOutputBuffer(String fileName) throws FileNotFoundException {
- super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
+ super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
new WriteSpilledDataProcessor(fileName)));
}
-
- public SpillingDataOutputBuffer(SpilledDataProcessor processor) throws FileNotFoundException {
- super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
- processor));
+
+ public SpillingDataOutputBuffer(SpilledDataProcessor processor)
+ throws FileNotFoundException {
+ super(new SpillingStream(3, 16 * 1024, 16 * 1024, true, processor));
}
-
-
/**
* Initializes the spilling buffer.
- * @throws FileNotFoundException
+ *
+ * @throws FileNotFoundException
*/
public SpillingDataOutputBuffer() throws FileNotFoundException {
super(new SpillingStream(3, 16 * 1024, 16 * 1024, true,
- new WriteSpilledDataProcessor(
- System.getProperty("java.io.tmpdir") + File.separatorChar
+ new WriteSpilledDataProcessor(System.getProperty("java.io.tmpdir")
+ + File.separatorChar
+ new BigInteger(128, new SecureRandom()).toString(32))));
}
@@ -465,32 +481,36 @@ public class SpillingDataOutputBuffer ex
* @param direct
* @param fileName
*/
- public SpillingDataOutputBuffer(int bufferCount, int bufferSize, int threshold,
- boolean direct, SpilledDataProcessor processor) {
+ public SpillingDataOutputBuffer(int bufferCount, int bufferSize,
+ int threshold, boolean direct, SpilledDataProcessor processor) {
super(new SpillingStream(bufferCount, bufferSize, threshold, direct,
processor));
}
-
- public void clear() throws IOException{
+
+ public void clear() throws IOException {
SpillingStream stream = (SpillingStream) this.out;
stream.clear();
}
-
- public boolean hasSpilled(){
+
+ public boolean hasSpilled() {
return ((SpillingStream) this.out).startedSpilling_;
}
+ public void markRecordEnd() {
+ ((SpillingStream) this.out).markEndOfRecord();
+ }
+
/**
* Provides an input stream to read from the spilling buffer.
*
* @throws IOException
*/
- public SpilledDataInputBuffer getInputStreamToRead(String fileName) throws IOException {
+ public SpilledDataInputBuffer getInputStreamToRead(String fileName)
+ throws IOException {
SpillingStream stream = (SpillingStream) this.out;
SpilledDataInputBuffer.SpilledInputStream inStream = new SpilledDataInputBuffer.SpilledInputStream(
- fileName, stream.direct_, stream.bufferList_,
- stream.startedSpilling_);
+ fileName, stream.direct_, stream.bufferList_, stream.startedSpilling_);
inStream.prepareRead();
return new SpilledDataInputBuffer(inStream);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/WriteSpilledDataProcessor.java Wed Feb 6 14:08:26 2013
@@ -17,10 +17,10 @@
*/
package org.apache.hama.bsp.message.io;
+import java.io.File;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
@@ -28,8 +28,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
/**
- *
- *
+ * A {@link SpilledDataProcessor} that writes the spilled data to the file.
*/
public class WriteSpilledDataProcessor implements SpilledDataProcessor {
@@ -37,28 +36,43 @@ public class WriteSpilledDataProcessor i
.getLog(WriteSpilledDataProcessor.class);
private FileChannel fileChannel;
- private RandomAccessFile raf;
private String fileName;
-
+
public WriteSpilledDataProcessor(String fileName)
throws FileNotFoundException {
this.fileName = fileName;
- raf = new RandomAccessFile(fileName, "rw");
- fileChannel = raf.getChannel();
+ }
+
+ private void initializeFileChannel() {
+ FileOutputStream stream;
+ try {
+ stream = new FileOutputStream(new File(fileName), true);
+ } catch (FileNotFoundException e) {
+ LOG.error("Error opening file to write spilled data.", e);
+ throw new RuntimeException(e);
+ }
+ fileChannel = stream.getChannel();
}
@Override
public boolean init(Configuration conf) {
+
return true;
}
@Override
- public boolean handleSpilledBuffer(ByteBuffer buffer) {
+ public boolean handleSpilledBuffer(SpilledByteBuffer buffer) {
try {
- fileChannel.write(buffer);
+
+ if(fileChannel == null){
+ initializeFileChannel();
+ }
+
+ fileChannel.write(buffer.getByteBuffer());
+ fileChannel.force(true);
return true;
} catch (IOException e) {
- LOG.error("Error writing to file:"+fileName, e);
+ LOG.error("Error writing to file:" + fileName, e);
}
return false;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed Feb 6 14:08:26 2013
@@ -305,4 +305,9 @@ public final class DiskQueue<M extends W
.getJobID().toString()), id.getTaskID().toString());
}
+ @Override
+ public boolean isMessageSerialized() {
+ return false;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Wed Feb 6 14:08:26 2013
@@ -103,4 +103,9 @@ public final class MemoryQueue<M extends
}
+ @Override
+ public boolean isMessageSerialized() {
+ return false;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Wed Feb 6 14:08:26 2013
@@ -79,5 +79,11 @@ public interface MessageQueue<M> extends
* @return how many items are in the queue.
*/
public int size();
+
+ /**
+ *
+ * @return true if the messages in the queue are serialized to byte buffers.
+ */
+ public boolean isMessageSerialized();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Wed Feb 6 14:08:26 2013
@@ -105,4 +105,9 @@ public final class SortedMessageQueue<M
}
+ @Override
+ public boolean isMessageSerialized() {
+ return false;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Wed Feb 6 14:08:26 2013
@@ -31,16 +31,17 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.PreFetchCache;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
import org.apache.hama.bsp.message.io.SpilledDataProcessor;
import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
-import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
/**
*
- *
+ *
* @param <M>
*/
public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
@@ -67,26 +68,25 @@ public class SpillingQueue<M extends Wri
private SpilledDataInputBuffer spilledInput;
private boolean objectWritableMode;
private ObjectWritable objectWritable;
-
+
private Class<M> messageClass;
private PreFetchCache<M> prefetchCache;
private boolean enablePrefetch;
-
private class SpillIterator implements Iterator<M> {
private boolean objectMode;
private Class<M> classObject;
private M messageHolder;
-
- public SpillIterator(boolean mode, Class<M> classObj, Configuration conf){
+
+ public SpillIterator(boolean mode, Class<M> classObj, Configuration conf) {
this.objectMode = mode;
this.classObject = classObj;
- if(classObject != null){
+ if (classObject != null) {
messageHolder = ReflectionUtils.newInstance(classObj, conf);
}
}
-
+
@Override
public boolean hasNext() {
return numMessagesRead != numMessagesWritten && numMessagesWritten > 0;
@@ -94,10 +94,9 @@ public class SpillingQueue<M extends Wri
@Override
public M next() {
- if(objectMode){
+ if (objectMode) {
return poll();
- }
- else {
+ } else {
return poll(messageHolder);
}
}
@@ -134,6 +133,7 @@ public class SpillingQueue<M extends Wri
} else {
msg.write(spillOutputBuffer);
}
+ spillOutputBuffer.markRecordEnd();
++numMessagesWritten;
} catch (IOException e) {
LOG.error("Error adding message.", e);
@@ -189,19 +189,21 @@ public class SpillingQueue<M extends Wri
public void init(Configuration conf, TaskAttemptID arg1) {
bufferCount = conf.getInt(SPILLBUFFER_COUNT, 3);
- bufferSize = conf.getInt(SPILLBUFFER_SIZE, 16 * 1024);
+ bufferSize = conf.getInt(SPILLBUFFER_SIZE, Constants.BUFFER_DEFAULT_SIZE);
direct = conf.getBoolean(SPILLBUFFER_DIRECT, true);
- threshold = conf.getInt(SPILLBUFFER_THRESHOLD, 16 * 1024);
+ threshold = conf.getInt(SPILLBUFFER_THRESHOLD,
+ Constants.BUFFER_DEFAULT_SIZE);
fileName = conf.get(SPILLBUFFER_FILENAME,
System.getProperty("java.io.tmpdir") + File.separatorChar
+ new BigInteger(128, new SecureRandom()).toString(32));
-
- messageClass = (Class<M>) conf.getClass(SPILLBUFFER_MSGCLASS, null);
+
+ messageClass = (Class<M>) conf.getClass(Constants.MESSAGE_CLASS, null);
objectWritableMode = messageClass == null;
-
+
SpilledDataProcessor processor;
try {
- processor = new WriteSpilledDataProcessor(fileName);
+ processor = new CombineSpilledDataProcessor<M>(fileName);
+ processor.init(conf);
} catch (FileNotFoundException e) {
LOG.error("Error initializing spilled data stream.", e);
throw new RuntimeException(e);
@@ -212,14 +214,14 @@ public class SpillingQueue<M extends Wri
objectWritable.setConf(conf);
this.conf = conf;
}
-
- private void incReadMsgCount(){
+
+ private void incReadMsgCount() {
++numMessagesRead;
}
-
+
@SuppressWarnings("unchecked")
- private M readDirect(M msg){
- if(numMessagesRead >= numMessagesWritten){
+ private M readDirect(M msg) {
+ if (numMessagesRead >= numMessagesWritten) {
return null;
}
try {
@@ -239,19 +241,18 @@ public class SpillingQueue<M extends Wri
}
public M poll(M msg) {
- if(numMessagesRead >= numMessagesWritten){
+ if (numMessagesRead >= numMessagesWritten) {
return null;
}
- if(enablePrefetch){
+ if (enablePrefetch) {
return readFromPrefetch(msg);
- }
- else {
+ } else {
return readDirect(msg);
}
}
-
+
@SuppressWarnings("unchecked")
- private M readDirectObjectWritable(){
+ private M readDirectObjectWritable() {
if (!objectWritableMode) {
throw new IllegalStateException(
"API call not supported. Set the configuration property "
@@ -266,34 +267,32 @@ public class SpillingQueue<M extends Wri
}
return (M) objectWritable.get();
}
-
+
@SuppressWarnings({ "unchecked" })
- private M readFromPrefetch(M msg){
- if(objectWritableMode){
+ private M readFromPrefetch(M msg) {
+ if (objectWritableMode) {
this.objectWritable = (ObjectWritable) prefetchCache.get();
incReadMsgCount();
- return (M)this.objectWritable.get();
- }
- else {
+ return (M) this.objectWritable.get();
+ } else {
incReadMsgCount();
- return (M)this.prefetchCache.get();
+ return (M) this.prefetchCache.get();
}
-
+
}
@Override
public M poll() {
- if(numMessagesRead >= numMessagesWritten){
+ if (numMessagesRead >= numMessagesWritten) {
return null;
}
-
- if(enablePrefetch){
+
+ if (enablePrefetch) {
M msg = readFromPrefetch(null);
- if(msg != null)
+ if (msg != null)
incReadMsgCount();
return msg;
- }
- else {
+ } else {
return readDirectObjectWritable();
}
}
@@ -312,7 +311,7 @@ public class SpillingQueue<M extends Wri
LOG.error("Error initializing the input spilled stream", e);
throw new RuntimeException(e);
}
- if(conf.getBoolean(ENABLE_PREFETCH, false)){
+ if (conf.getBoolean(ENABLE_PREFETCH, false)) {
this.prefetchCache = new PreFetchCache<M>(numMessagesWritten);
this.enablePrefetch = true;
try {
@@ -320,6 +319,8 @@ public class SpillingQueue<M extends Wri
} catch (InterruptedException e) {
LOG.error("Error starting prefetch on message queue.", e);
throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
}
@@ -333,5 +334,12 @@ public class SpillingQueue<M extends Wri
public int size() {
return numMessagesWritten;
}
+
+
+ @Override
+ public boolean isMessageSerialized() {
+ return true;
+ }
+
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java?rev=1442969&r1=1442968&r2=1442969&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java Wed Feb 6 14:08:26 2013
@@ -19,16 +19,34 @@ package org.apache.hama.bsp.message;
import java.io.EOFException;
import java.io.File;
+import java.io.RandomAccessFile;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.security.SecureRandom;
+import java.util.Iterator;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
+import org.apache.hama.bsp.message.io.DirectByteBufferInputStream;
+import org.apache.hama.bsp.message.io.DirectByteBufferOutputStream;
+import org.apache.hama.bsp.message.io.ReusableByteBuffer;
+import org.apache.hama.bsp.message.io.SpilledByteBuffer;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
+import org.apache.hama.bsp.message.io.SpilledDataProcessor;
import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
+import org.apache.hama.bsp.message.io.SyncFlushByteBufferOutputStream;
+import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;
-import junit.framework.TestCase;
-
public class TestMessageIO extends TestCase {
public void testNonSpillBuffer() throws Exception {
@@ -39,23 +57,22 @@ public class TestMessageIO extends TestC
for (int i = 0; i < 100; ++i) {
text.write(outputBuffer);
}
-
assertTrue(outputBuffer != null);
assertTrue(outputBuffer.size() == 4000);
assertFalse(outputBuffer.hasSpilled());
-
outputBuffer.close();
-
}
public void testSpillBuffer() throws Exception {
+ Configuration conf = new HamaConfiguration();
String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+ new BigInteger(128, new SecureRandom()).toString(32);
+ SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
+ processor.init(conf);
SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
- 1024, 1024, true, new WriteSpilledDataProcessor(fileName));
+ 1024, 1024, true, processor);
Text text = new Text("Testing the spillage of spilling buffer");
-
for (int i = 0; i < 100; ++i) {
text.write(outputBuffer);
}
@@ -70,48 +87,398 @@ public class TestMessageIO extends TestC
}
- public void testSpillInputStream() throws Exception {
- String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
- + new BigInteger(128, new SecureRandom()).toString(32);
- SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
- 1024, 1024, true, new WriteSpilledDataProcessor(fileName));
- Text text = new Text("Testing the spillage of spilling buffer");
+ public static class SumCombiner extends Combiner<IntWritable> {
+ @Override
+ public IntWritable combine(Iterable<IntWritable> messages) {
+ int sum = 0;
+ for (IntWritable intObj : messages) {
+ sum += intObj.get();
+ }
+ return new IntWritable(sum);
+ }
+
+ }
+
+ public void testSpillingByteBuffer() throws Exception {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ SpilledByteBuffer spillBuffer = new SpilledByteBuffer(buffer);
for (int i = 0; i < 100; ++i) {
- text.write(outputBuffer);
+ spillBuffer.putInt(i);
+ spillBuffer.markEndOfRecord();
}
+ spillBuffer.putInt(100);
+ assertEquals(spillBuffer.getMarkofLastRecord(), 400);
+ assertEquals(spillBuffer.remaining(), (512 - 404));
+ spillBuffer.flip();
+ assertEquals(spillBuffer.remaining(), 404);
+ assertEquals(spillBuffer.getMarkofLastRecord(), 400);
- assertTrue(outputBuffer != null);
- assertTrue(outputBuffer.size() == 4000);
- assertTrue(outputBuffer.hasSpilled());
- File f = new File(fileName);
- assertTrue(f.exists());
- outputBuffer.close();
- assertTrue(f.length() == 4000L);
+ }
- SpilledDataInputBuffer inputBuffer = outputBuffer
- .getInputStreamToRead(fileName);
+ public void testDirectByteBufferOutput() throws Exception {
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
for (int i = 0; i < 100; ++i) {
- text.readFields(inputBuffer);
- assertTrue("Testing the spillage of spilling buffer".equals(text
- .toString()));
- text.clear();
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+
+ stream.close();
+
+ buffer.flip();
+ for (int i = 0; i < 100; ++i) {
+ assertEquals(i, buffer.getInt());
}
try {
- text.readFields(inputBuffer);
+ buffer.getInt();
assertTrue(false);
- } catch (EOFException eof) {
+ } catch (Exception e) {
assertTrue(true);
}
- inputBuffer.close();
- inputBuffer.completeReading(false);
- assertTrue(f.exists());
- inputBuffer.completeReading(true);
- assertFalse(f.exists());
+ }
+
+ public void testDirectByteBufferInput() throws Exception {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
+
+ for (int i = 0; i < 100; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+ intWritable.write(stream);
+
+ stream.close();
+
+ buffer.flip();
+
+ DirectByteBufferInputStream inStream = new DirectByteBufferInputStream();
+
+ inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+ for (int i = 0; i < 100; ++i) {
+ intWritable.readFields(inStream);
+ assertEquals(i, intWritable.get());
+ }
+
+ assertFalse(inStream.hasDataToRead());
+ assertTrue(inStream.hasUnmarkData());
+ inStream.prepareForNext();
+
+ // push in another buffer and check if the unmarked data could be read.
+
+ buffer.clear();
+ stream = new DirectByteBufferOutputStream();
+ buffer = ByteBuffer.allocateDirect(2048);
+ stream.setBuffer(buffer);
+
+ for (int i = 0; i < 400; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+ stream.close();
+ buffer.flip();
+
+ inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+
+ // Read previous data
+ intWritable.readFields(inStream);
+ assertEquals(99, intWritable.get());
+
+ for (int i = 0; i < 100; ++i) {
+ intWritable.readFields(inStream);
+ assertEquals(i, intWritable.get());
+ }
+
+ assertFalse(inStream.hasDataToRead());
+ assertTrue(inStream.hasUnmarkData());
+ inStream.prepareForNext();
+
+ buffer.clear();
+ stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+ for (int i = 0; i < 100; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+ stream.close();
+ buffer.flip();
+
+ inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
+
+ // Read previous data with resized intermediate buffer
+ for (int i = 100; i < 400; ++i) {
+ intWritable.readFields(inStream);
+ assertEquals(i, intWritable.get());
+ }
+
+ for (int i = 0; i < 100; ++i) {
+ intWritable.readFields(inStream);
+ assertEquals(i, intWritable.get());
+ }
+
+ assertFalse(inStream.hasDataToRead());
+ assertFalse(inStream.hasUnmarkData());
+
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ public void testReusableByteBufferIter() throws Exception {
+
+ ReusableByteBuffer<IntWritable> reuseByteBuffer = new ReusableByteBuffer<IntWritable>(
+ new IntWritable());
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
+
+ for (int i = 0; i < 100; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+ intWritable.write(stream);
+ stream.close();
+ buffer.flip();
+ reuseByteBuffer.set(new SpilledByteBuffer(buffer, 400));
+
+ Iterator<IntWritable> iter = reuseByteBuffer.iterator();
+ int j = 0;
+ while (iter.hasNext()) {
+ assertEquals(iter.next().get(), j++);
+ }
+ assertEquals(j, 100);
+ reuseByteBuffer.prepareForNext();
+
+ buffer.clear();
+
+ stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+
+ for (int i = 0; i < 101; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ }
+ stream.close();
+ buffer.flip();
+
+ reuseByteBuffer.set(new SpilledByteBuffer(buffer, 404));
+ iter = reuseByteBuffer.iterator();
+ assertEquals(iter.next().get(), 99);
+
+ j = 0;
+ while (iter.hasNext()) {
+ assertEquals(iter.next().get(), j++);
+ }
+ buffer.clear();
+ }
+
+ public void testCombineProcessor() throws Exception {
+ String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
+ stream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
+ int sum = 0;
+ for (int i = 0; i < 100; ++i) {
+ intWritable.set(i);
+ intWritable.write(stream);
+ sum += i;
+ }
+ intWritable.write(stream);
+ stream.close();
+ buffer.flip();
+
+ Configuration conf = new HamaConfiguration();
+
+ conf.setClass(Constants.MESSAGE_CLASS, IntWritable.class, Writable.class);
+ conf.setClass(Constants.COMBINER_CLASS, SumCombiner.class, Combiner.class);
+
+ CombineSpilledDataProcessor<IntWritable> processor = new CombineSpilledDataProcessor<IntWritable>(
+ fileName);
+ assertTrue(processor.init(conf));
+ File f = new File(fileName);
+ try {
+ assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
+ 400)));
+ buffer.flip();
+ assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
+ 400)));
+ assertTrue(processor.close());
+
+ assertTrue(f.exists());
+ assertEquals(f.length(), 8);
+
+ RandomAccessFile raf = new RandomAccessFile(fileName, "r");
+ FileChannel fileChannel = raf.getChannel();
+ ByteBuffer readBuff = ByteBuffer.allocateDirect(16);
+ fileChannel.read(readBuff);
+ readBuff.flip();
+ assertEquals(readBuff.getInt(), sum);
+ assertEquals(readBuff.getInt(), sum + 99);
+ } finally {
+ assertTrue(f.delete());
+ }
+
+ }
+
+ public void testSpillInputStream() throws Exception {
+
+ File f = null;
+ try {
+ String fileName = System.getProperty("java.io.tmpdir")
+ + File.separatorChar + "testSpillInputStream.txt";
+ Configuration conf = new HamaConfiguration();
+ SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
+ processor.init(conf);
+ SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
+ 1024, 1024, true, processor);
+ Text text = new Text("Testing the spillage of spilling buffer");
+ for (int i = 0; i < 100; ++i) {
+ text.write(outputBuffer);
+ outputBuffer.markRecordEnd();
+ }
+
+ assertTrue(outputBuffer != null);
+ assertTrue(outputBuffer.size() == 4000);
+ assertTrue(outputBuffer.hasSpilled());
+ f = new File(fileName);
+ assertTrue(f.exists());
+ outputBuffer.close();
+ assertTrue(f.length() == 4000);// + (4000 / 1024 + 1) * 4));
+
+ SpilledDataInputBuffer inputBuffer = outputBuffer
+ .getInputStreamToRead(fileName);
+
+ for (int i = 0; i < 100; ++i) {
+ text.readFields(inputBuffer);
+ assertTrue("Testing the spillage of spilling buffer".equals(text
+ .toString()));
+ text.clear();
+ }
+
+ try {
+ text.readFields(inputBuffer);
+ assertTrue(false);
+ } catch (EOFException eof) {
+ assertTrue(true);
+ }
+
+ inputBuffer.close();
+ inputBuffer.completeReading(false);
+ assertTrue(f.exists());
+ inputBuffer.completeReading(true);
+ assertFalse(f.exists());
+ } finally {
+ if (f != null) {
+ if (f.exists()) {
+ f.delete();
+ }
+ }
+ }
+
+ }
+
+ public void testSyncFlushByteBufferOutputStream() throws Exception {
+
+ File f = null;
+ try {
+ String fileName = System.getProperty("java.io.tmpdir")
+ + File.separatorChar + "testSyncFlushByteBufferOutputStream.txt";
+ SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
+ fileName);
+ DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
+ stream);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ syncFlushStream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
+
+ for (int i = 0; i < 200; ++i) {
+ intWritable.set(i);
+ intWritable.write(syncFlushStream);
+ }
+ intWritable.write(syncFlushStream);
+ syncFlushStream.close();
+
+ f = new File(fileName);
+ assertTrue(f.exists());
+ assertTrue(f.length() == 804);
+ assertTrue(f.delete());
+ } finally {
+ if (f != null) {
+ f.delete();
+ }
+ }
+
+ }
+
+ public void testSyncFlushBufferInputStream() throws Exception {
+ File f = null;
+ try {
+ String fileName = System.getProperty("java.io.tmpdir")
+ + File.separatorChar + "testSyncFlushBufferInputStream.txt";
+ SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
+ fileName);
+ DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
+ stream);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(512);
+ syncFlushStream.setBuffer(buffer);
+ IntWritable intWritable = new IntWritable(1);
+
+ for (int i = 0; i < 200; ++i) {
+ intWritable.set(i);
+ intWritable.write(syncFlushStream);
+ }
+ intWritable.write(syncFlushStream);
+ syncFlushStream.close();
+
+ f = new File(fileName);
+ assertTrue(f.exists());
+ assertEquals(f.length(), 804);
+
+ SyncReadByteBufferInputStream syncReadStream = new SyncReadByteBufferInputStream(
+ stream.isSpilled(), fileName);
+ DirectByteBufferInputStream inStream = new DirectByteBufferInputStream(
+ syncReadStream);
+ buffer.clear();
+ inStream.setBuffer(buffer);
+
+ for (int i = 0; i < 200; ++i) {
+ intWritable.readFields(inStream);
+ assertEquals(intWritable.get(), i);
+ }
+
+ intWritable.readFields(inStream);
+ assertEquals(intWritable.get(), 199);
+
+ try {
+ intWritable.readFields(inStream);
+ assertFalse(true);
+ } catch (Exception e) {
+ assertTrue(true);
+ }
+
+ inStream.close();
+ syncFlushStream.close();
+
+ } finally {
+ if (f != null) {
+ f.delete();
+ }
+ }
}
}