You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/02/06 20:23:43 UTC
svn commit: r1443152 -
/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
Author: surajsmenon
Date: Wed Feb 6 19:23:42 2013
New Revision: 1443152
URL: http://svn.apache.org/viewvc?rev=1443152&view=rev
Log:
Fixing the build after [HAMA-723] final patch.
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * <code>ByteBufferInputStream<code> is used to read back from SpilledByteBuffer. The
+ * goal of this class to read data till a boundary record marked. The remaining
+ * data which is the partial record in the byte buffer is read into the intermediate
+ * buffer using {@link ByteBufferInputStream#fillForNext()}. If this stream is
+ * to be used as an intermediate buffer to read from a bigger source of data,
+ * say a file, the function {@link ByteBufferInputStream#onBufferRead(byte[], int, int, int)}
+ * could be used to fill up the ByteBuffer encapsulated by the class.
+ *
+ */
+public class ByteBufferInputStream extends InputStream {
+
+ private final byte[] readByte = new byte[1];
+ private int interBuffSize;
+ private byte[] interBuffer;
+ private int dataSizeInInterBuffer;
+ private int curPos;
+
+ protected ByteBuffer buffer;
+ private long toBeRead;
+ private long bytesRead;
+ private long totalBytes;
+
+ public ByteBufferInputStream() {
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ }
+
+ public boolean hasDataToRead() {
+ return ((toBeRead - bytesRead) > 0);
+ }
+
+ public boolean hasUnmarkedData() {
+ return ((totalBytes - bytesRead) > 0);
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (dataSizeInInterBuffer > 0) {
+ --dataSizeInInterBuffer;
+ ++bytesRead;
+ return interBuffer[curPos++] & 0xFF;
+ }
+
+ if (-1 == read(readByte, 0, 1)) {
+ return -1;
+ }
+ return readByte[0] & 0xFF;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (dataSizeInInterBuffer >= len) {
+ // copy the count of bytes to b
+ System.arraycopy(interBuffer, curPos, b, off, len);
+ dataSizeInInterBuffer -= len;
+ curPos += len;
+ bytesRead += len;
+ return len;
+ }
+ int size = 0;
+
+ while (len > 0) {
+ if (dataSizeInInterBuffer == 0) {
+ dataSizeInInterBuffer = readInternal(interBuffer, 0, interBuffer.length);
+ curPos = 0;
+ if (dataSizeInInterBuffer <= 0) {
+ break;
+ }
+
+ }
+ int readSize = Math.min(dataSizeInInterBuffer, len);
+ System.arraycopy(interBuffer, curPos, b, off, readSize);
+ len -= readSize;
+ off += readSize;
+ size += readSize;
+ dataSizeInInterBuffer -= readSize;
+ curPos += readSize;
+ }
+ bytesRead += size;
+ return size;
+ }
+
+ public int readInternal(byte[] b, int off, int len) throws IOException {
+ if (buffer == null) {
+ return -1;
+ }
+ int cur = 0;
+ while (len > 0) {
+ int rem = buffer.remaining();
+ if (rem == 0) {
+ return onBufferRead(b, off, len, cur);
+ }
+ int readSize = Math.min(rem, len);
+ buffer.get(b, off, readSize);
+ len -= readSize;
+ off += readSize;
+ cur += readSize;
+ }
+ return cur;
+ }
+
+ /**
+ * When the byte buffer encapsulated is out of data then this function is
+ * invoked.
+ *
+ * @param b the byte buffer to read into
+ * @param off offset index to start writing
+ * @param len length of data to be written
+ * @param cur The current size already read by the class.
+ * @return if the end of the stream has reached, and cur is 0 return -1; else
+ * return the data size currently read.
+ * @throws IOException
+ */
+ protected int onBufferRead(byte[] b, int off, int len, int cur)
+ throws IOException {
+ if (cur != 0)
+ return cur;
+ else
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ return read(b, 0, b.length);
+ }
+
+ /**
+ * Sets the byte buffer to read the data from.
+ * @param buffer The byte buffer to read data from
+ * @param toRead Number of bytes till end of last record.
+ * @param total Total bytes of data to read in the buffer.
+ * @throws IOException
+ */
+ public void setBuffer(ByteBuffer buffer, long toRead, long total)
+ throws IOException {
+ this.buffer = buffer;
+ toBeRead = toRead += dataSizeInInterBuffer;
+ totalBytes = total;
+ bytesRead = 0;
+ if (interBuffer == null) {
+ interBuffSize = Math.min(buffer.remaining(), 8192);
+ interBuffer = new byte[interBuffSize];
+ dataSizeInInterBuffer = 0;
+ }
+ fetchIntermediate();
+ }
+
+ private void fetchIntermediate() throws IOException {
+ int readSize = readInternal(interBuffer, dataSizeInInterBuffer,
+ interBuffer.length - dataSizeInInterBuffer);
+ if (readSize > 0) {
+ dataSizeInInterBuffer += readSize;
+ }
+ curPos = 0;
+ }
+
+ /**
+ * This function should be called to provision reading the partial records
+ * into the buffer after the last record in the buffer is read. This data
+ * would be appended with the next ByteBuffer that is set using
+ * {@link ByteBufferInputStream#setBuffer(ByteBuffer, long, long)} to start
+ * reading records.
+ * @throws IOException
+ */
+ public void fillForNext() throws IOException {
+
+ int remainingBytes = buffer.remaining();
+ if (curPos != 0) {
+ System.arraycopy(interBuffer, curPos, interBuffer, 0,
+ dataSizeInInterBuffer);
+ }
+ curPos = 0;
+ if (remainingBytes == 0)
+ return;
+
+ if (dataSizeInInterBuffer + remainingBytes > interBuffSize) {
+ interBuffSize = dataSizeInInterBuffer + remainingBytes;
+ byte[] arr = this.interBuffer;
+ this.interBuffer = new byte[interBuffSize];
+ System.arraycopy(arr, 0, interBuffer, 0, dataSizeInInterBuffer);
+ }
+ int readSize = readInternal(this.interBuffer, dataSizeInInterBuffer,
+ remainingBytes);
+ if (readSize > 0)
+ dataSizeInInterBuffer += readSize;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ByteBufferOutputStream encapsulates a byte buffer to write data into. The
+ * function {@link ByteBufferOutputStream#onBufferFull(byte[], int, int)}
+ * should be overriden to handle the case when the size of data exceeds the size
+ * of buffer. The default behavior is to throw an exception.
+ */
+class ByteBufferOutputStream extends OutputStream {
+
+ private final byte[] b = new byte[1];
+ private byte[] interBuffer;
+ private int interBufferDataSize;
+ protected ByteBuffer buffer;
+
+ public void clear() {
+ if (this.buffer != null) {
+ this.buffer.clear();
+ }
+ interBufferDataSize = 0;
+ }
+
+ /**
+ * Sets the buffer for the stream.
+ * @param buffer byte buffer to hold within.
+ */
+ public void setBuffer(ByteBuffer buffer) {
+ this.buffer = buffer;
+ this.interBufferDataSize = 0;
+ int interSize = Math.min(buffer.capacity()/2, 8192);
+ if(interBuffer == null){
+ interBuffer = new byte[interSize];
+ }
+
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (interBufferDataSize < interBuffer.length - 1) {
+ interBuffer[interBufferDataSize++] = (byte) (b & 0xFF);
+ return;
+ }
+
+ this.b[0] = (byte) (b & 0xFF);
+ write(this.b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len >= interBuffer.length) {
+ /*
+ * If the request length exceeds the size of the output buffer, flush the
+ * output buffer and then write the data directly. In this way buffered
+ * streams will cascade harmlessly.
+ */
+ flushBuffer();
+ writeInternal(b, off, len);
+ return;
+ }
+ if (len > interBuffer.length - interBufferDataSize) {
+ flushBuffer();
+ }
+ System.arraycopy(b, off, interBuffer, interBufferDataSize, len);
+ interBufferDataSize += len;
+ }
+
+ private void writeInternal(byte[] b, int off, int len) throws IOException {
+
+ if (len <= buffer.remaining() || onBufferFull(b, off, len)) {
+ buffer.put(b, off, len);
+ }
+ }
+
+ /**
+ * Action to take when the data to be written exceeds the size of the byte
+ * buffer inside.
+ *
+ * @return
+ * @throws IOException
+ */
+ protected boolean onBufferFull(byte[] b, int off, int len) throws IOException {
+ return true;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ flushBuffer();
+ onFlush();
+ }
+
+ /**
+ * Called when the byte buffer stream is closed.
+ * @throws IOException
+ */
+ protected void onFlush() throws IOException {
+
+ }
+
+ /** Flush the internal buffer */
+ private void flushBuffer() throws IOException {
+ if (interBufferDataSize > 0) {
+ writeInternal(interBuffer, 0, interBufferDataSize);
+ interBufferDataSize = 0;
+ }
+ }
+
+ public ByteBuffer getBuffer() {
+ return this.buffer;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * This data processor adds a stage in between the spillage of data. Based on
+ * the combiner provided it combines the bunch of messages in the byte buffer
+ * and then writes them to the file writer using the base class handle method.
+ * TODO: Experiment the feature with pipelining design.
+ */
+public class CombineSpilledDataProcessor<M extends Writable> extends
+ WriteSpilledDataProcessor {
+
+ public static Log LOG = LogFactory.getLog(CombineSpilledDataProcessor.class);
+
+ Combiner<M> combiner;
+ M writableObject;
+ ReusableByteBuffer<M> iterator;
+ DirectByteBufferOutputStream combineOutputBuffer;
+ ByteBuffer combineBuffer;
+
+ public CombineSpilledDataProcessor(String fileName)
+ throws FileNotFoundException {
+ super(fileName);
+ }
+
+ @Override
+ public boolean init(Configuration conf) {
+ if (!super.init(conf)) {
+ return false;
+ }
+ String className = conf.get(Constants.COMBINER_CLASS);
+
+ if (className == null)
+ return true;
+ try {
+ combiner = ReflectionUtils.newInstance(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ className = conf.get(Constants.MESSAGE_CLASS);
+
+ if (className != null) {
+ try {
+ writableObject = ReflectionUtils.newInstance(className);
+ iterator = new ReusableByteBuffer<M>(writableObject);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Error combining the records.", e);
+ return false;
+ }
+ }
+
+ combineOutputBuffer = new DirectByteBufferOutputStream();
+ combineBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_DEFAULT_SIZE);
+ combineOutputBuffer.setBuffer(combineBuffer);
+ return true;
+ }
+
+ @Override
+ public boolean handleSpilledBuffer(SpilledByteBuffer buffer) {
+
+ if (combiner == null || writableObject == null) {
+ return super.handleSpilledBuffer(buffer);
+ }
+
+ try {
+ iterator.set(buffer);
+ } catch (IOException e1) {
+ LOG.error("Error setting buffer for combining data", e1);
+ return false;
+ }
+ Writable combinedMessage = combiner.combine(iterator);
+ try {
+ iterator.prepareForNext();
+ } catch (IOException e1) {
+ LOG.error("Error preparing for next buffer.", e1);
+ return false;
+ }
+ try {
+ combinedMessage.write(this.combineOutputBuffer);
+ } catch (IOException e) {
+ LOG.error("Error writing the combiner output.", e);
+ return false;
+ }
+ try {
+ this.combineOutputBuffer.flush();
+ } catch (IOException e) {
+ LOG.error("Error flushing the combiner output.", e);
+ return false;
+ }
+ this.combineOutputBuffer.getBuffer().flip();
+ try {
+ return super.handleSpilledBuffer(new SpilledByteBuffer(
+ this.combineOutputBuffer.getBuffer(), this.combineOutputBuffer
+ .getBuffer().remaining()));
+ } finally {
+ this.combineOutputBuffer.clear();
+ }
+ }
+
+ @Override
+ public boolean close() {
+ return true;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Encapsulates {@link ByteBufferInputStream}
+ */
+public class DirectByteBufferInputStream extends DataInputStream implements
+ DataInput {
+
+ public DirectByteBufferInputStream(ByteBufferInputStream in) {
+ super(in);
+ }
+
+ public DirectByteBufferInputStream() {
+ super(new ByteBufferInputStream());
+ }
+
+ public void prepareForNext() throws IOException {
+ ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+ stream.fillForNext();
+ }
+
+ public boolean hasDataToRead() {
+ ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+ return stream.hasDataToRead();
+ }
+
+ public boolean hasUnmarkData() {
+ ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+ return stream.hasUnmarkedData();
+ }
+
+ public void setBuffer(SpilledByteBuffer buff) throws IOException {
+ ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+ stream.setBuffer(buff.getByteBuffer(), buff.getMarkofLastRecord(),
+ buff.remaining());
+ }
+
+ public void setBuffer(ByteBuffer buffer) throws IOException {
+ ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+ stream.setBuffer(buffer, buffer.remaining(), buffer.remaining());
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Encapsulates a {@link DirectByteBufferOutputStream}.
+ *
+ */
+public class DirectByteBufferOutputStream extends DataOutputStream {
+
+ public DirectByteBufferOutputStream() {
+ super(new ByteBufferOutputStream());
+ }
+
+ public DirectByteBufferOutputStream(ByteBufferOutputStream stream){
+ super(stream);
+ }
+
+ public ByteBuffer getBuffer() {
+ ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+ return stream.getBuffer();
+ }
+
+ public void setBuffer(ByteBuffer buffer) {
+ ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+ stream.setBuffer(buffer);
+ }
+
+ public void clear(){
+ ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+ stream.clear();
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+
+/**
+ * A synchronous i/o stream that is used to write data into and to read back the
+ * written data.
+ */
+public class DualChannelByteBufferStream {
+
+ private DirectByteBufferOutputStream outputBuffer;
+ private SyncFlushByteBufferOutputStream outputStream;
+ private DirectByteBufferInputStream inputBuffer;
+ private SyncReadByteBufferInputStream inputStream;
+
+ private String fileName;
+
+ private ByteBuffer buffer;
+ private boolean outputMode;
+ private boolean inputMode;
+
+ public void init(Configuration conf) {
+
+ boolean directAlloc = conf.getBoolean(Constants.BYTEBUFFER_DIRECT,
+ Constants.BYTEBUFFER_DIRECT_DEFAULT);
+ int size = conf.getInt(Constants.BYTEBUFFER_SIZE,
+ Constants.BUFFER_DEFAULT_SIZE);
+ if (directAlloc) {
+ buffer = ByteBuffer.allocateDirect(size);
+ } else {
+ buffer = ByteBuffer.allocateDirect(size);
+ }
+ fileName = conf.get(Constants.DATA_SPILL_PATH) + File.separatorChar
+ + new BigInteger(128, new SecureRandom()).toString(32);
+ outputMode = true;
+ outputStream = new SyncFlushByteBufferOutputStream(fileName);
+ outputBuffer = new DirectByteBufferOutputStream(outputStream);
+ outputStream.setBuffer(buffer);
+
+ }
+
+ public DirectByteBufferOutputStream getOutputStream() {
+ return outputBuffer;
+ }
+
+ public void closeOutput() throws IOException {
+ if (outputMode) {
+ outputBuffer.close();
+ }
+ outputMode = false;
+ }
+
+ public void close() throws IOException{
+ closeInput();
+ closeOutput();
+ }
+
+ public boolean prepareRead() throws IOException {
+ outputStream.close();
+ outputMode = false;
+ buffer.clear();
+ inputStream = new SyncReadByteBufferInputStream(outputStream.isSpilled(),
+ fileName);
+ inputBuffer = new DirectByteBufferInputStream(inputStream);
+ inputBuffer.setBuffer(buffer);
+ inputMode = true;
+ return true;
+ }
+
+ public DirectByteBufferInputStream getInputStream() {
+ return inputBuffer;
+ }
+
+ public void closeInput() throws IOException {
+ if (inputMode) {
+ inputBuffer.close();
+ }
+ inputMode = false;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,78 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A utility class to just hold the byte buffer to read back the written data
+ * or write data to read back. Buffer overflow, underflow conditions are not
+ * enforced.
+ *
+ */
+public class DuplexByteArrayChannel implements WritableByteChannel,
+ ReadableByteChannel {
+
+ private boolean open;
+ private ByteBuffer buffer;
+
+ DuplexByteArrayChannel(){
+
+ }
+
+ @Override
+ public boolean isOpen() {
+ return open;
+ }
+
+ @Override
+ public void close() throws IOException {
+ open = false;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int num = dst.remaining();
+ dst.put(buffer);
+ return num - dst.remaining();
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int num = buffer.remaining();
+ buffer.put(src);
+ return num - buffer.remaining();
+ }
+
+ public void setBuffer(ByteBuffer buffer) {
+ this.buffer = buffer;
+ open = true;
+ }
+
+ public void flip() {
+ buffer.flip();
+ }
+
+ public ByteBuffer getBuffer(){
+ return buffer;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ *
+ *
+ * @param <M>
+ */
+public class ReusableByteBuffer<M extends Writable> implements Iterable<M> {
+
+ private DirectByteBufferInputStream stream;
+ private SpilledByteBuffer buffer;
+ private boolean isIterStarted;
+
+ private M message;
+
+ private static class ReusableByteBufferIterator<M extends Writable>
+ implements Iterator<M> {
+
+ private ReusableByteBuffer<M> buffer;
+ private M message;
+
+ public ReusableByteBufferIterator(ReusableByteBuffer<M> bbuffer, M msg) {
+ this.buffer = bbuffer;
+ this.message = msg;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!buffer.isIterStarted) {
+ throw new IllegalStateException(
+ "Iterator should be reinitialized to work with new buffer.");
+ }
+ return buffer.stream.hasDataToRead();
+ }
+
+ @Override
+ public M next() {
+ if (!buffer.isIterStarted) {
+ throw new IllegalStateException(
+ "Iterator should be reinitialized to work with new buffer.");
+ }
+ try {
+ message.readFields(this.buffer.stream);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return message;
+ }
+
+ @Override
+ public void remove() {
+ }
+ }
+
+ public ReusableByteBuffer(M reusableObject) {
+ stream = new DirectByteBufferInputStream();
+ message = reusableObject;
+ }
+
+ public void set(SpilledByteBuffer buffer) throws IOException {
+ this.buffer = buffer;
+ stream.setBuffer(this.buffer);
+ isIterStarted = false;
+ }
+
+ public void setReusableObject(M object) {
+ this.message = object;
+ }
+
+ @Override
+ public Iterator<M> iterator() {
+ if (isIterStarted) {
+ throw new UnsupportedOperationException(
+ "Only one iterator creation is allowed.");
+ }
+ isIterStarted = true;
+ return new ReusableByteBufferIterator<M>(this, message);
+ }
+
+ public void prepareForNext() throws IOException {
+ this.stream.prepareForNext();
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <code>SpilledByteBuffer</code> encapsulates a ByteBuffer. It lets the user to
+ * mark the end of last record written into the buffer.
+ *
+ */
+public class SpilledByteBuffer {
+
+ ByteBuffer buffer;
+ int endOfRecord;
+ Class<? extends Writable> writableClass;
+
+ public SpilledByteBuffer(boolean direct, int size) {
+ if (direct) {
+ buffer = ByteBuffer.allocate(size);
+ } else {
+ buffer = ByteBuffer.allocateDirect(size);
+ }
+ }
+
+ public void setRecordClass(Class<? extends Writable> classObj) {
+ this.writableClass = classObj;
+ }
+
+ public Class<? extends Writable> getRecordClass(){
+ return this.writableClass;
+ }
+
+ public SpilledByteBuffer(ByteBuffer byteBuffer) {
+ this.buffer = byteBuffer;
+ }
+
+ public SpilledByteBuffer(ByteBuffer byteBuffer, int markEnd) {
+ this.buffer = byteBuffer;
+ this.endOfRecord = markEnd;
+ }
+
+ public void markEndOfRecord() {
+ this.endOfRecord = this.buffer.position();
+ }
+
+ public void markEndOfRecord(int pos){
+ if(pos < this.buffer.capacity()){
+ this.endOfRecord = pos;
+ }
+ }
+
+ public int getMarkofLastRecord(){
+ return this.endOfRecord;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return buffer;
+ }
+
+ public CharBuffer asCharBuffer() {
+ return buffer.asCharBuffer();
+ }
+
+ public DoubleBuffer asDoubleBuffer() {
+ return buffer.asDoubleBuffer();
+ }
+
+ public FloatBuffer asFloatBuffer() {
+ return buffer.asFloatBuffer();
+ }
+
+ public IntBuffer asIntBuffer() {
+ return buffer.asIntBuffer();
+ }
+
+ public LongBuffer asLongBuffer() {
+ return buffer.asLongBuffer();
+ }
+
+ public SpilledByteBuffer asReadOnlyBuffer() {
+ return new SpilledByteBuffer(buffer.asReadOnlyBuffer());
+ }
+
+ public ShortBuffer asShortBuffer() {
+ return buffer.asShortBuffer();
+ }
+
+ public SpilledByteBuffer compact() {
+ buffer.compact();
+ return this;
+ }
+
+ public SpilledByteBuffer duplicate() {
+ buffer.duplicate();
+ return new SpilledByteBuffer(this.buffer, this.endOfRecord);
+ }
+
+ public byte get() {
+ return buffer.get();
+ }
+
+ public byte get(int index) {
+ return buffer.get(index);
+ }
+
+ public char getChar() {
+ return buffer.getChar();
+ }
+
+ public char getChar(int index) {
+ return buffer.getChar(index);
+ }
+
+ public double getDouble() {
+ return buffer.getDouble();
+ }
+
+ public double getDouble(int index) {
+ return buffer.getDouble(index);
+ }
+
+ public float getFloat() {
+ return buffer.getFloat();
+ }
+
+ public float getFloat(int index) {
+ return buffer.getFloat(index);
+ }
+
+ public int getInt() {
+ return buffer.getInt();
+ }
+
+ public int getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ public long getLong() {
+ return buffer.getLong();
+ }
+
+ public long getLong(int index) {
+ return buffer.getLong();
+ }
+
+ public short getShort() {
+ return buffer.getShort();
+ }
+
+ public short getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ public SpilledByteBuffer put(byte b) {
+ buffer.put(b);
+ return this;
+ }
+
+ public SpilledByteBuffer put(int index, byte b) {
+ buffer.put(index, b);
+ return this;
+ }
+
+ public SpilledByteBuffer putChar(char value) {
+ buffer.putChar(value);
+ return this;
+ }
+
+ public SpilledByteBuffer putChar(int index, char value) {
+ buffer.putChar(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putDouble(double value) {
+ buffer.putDouble(value);
+ return this;
+ }
+
+ public SpilledByteBuffer putDouble(int index, double value) {
+ buffer.putDouble(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putFloat(float value) {
+ buffer.putFloat(value);
+ return this;
+ }
+
+ public SpilledByteBuffer putFloat(int index, float value) {
+ buffer.putFloat(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putInt(int index, int value) {
+ buffer.putInt(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putInt(int value) {
+ buffer.putInt(value);
+ return this;
+ }
+
+ public SpilledByteBuffer putLong(int index, long value) {
+ buffer.putLong(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putLong(long value) {
+ buffer.putLong(value);
+ return this;
+ }
+
+ public SpilledByteBuffer putShort(int index, short value) {
+ buffer.putShort(index, value);
+ return this;
+ }
+
+ public SpilledByteBuffer putShort(short value) {
+ buffer.putShort(value);
+ return this;
+ }
+
+ public SpilledByteBuffer slice() {
+ return new SpilledByteBuffer(buffer.slice());
+ }
+
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+ public boolean isReadOnly() {
+ return buffer.isReadOnly();
+ }
+
+ public void clear() {
+ buffer.clear();
+ }
+
+ public SpilledByteBuffer flip() {
+ buffer.flip();
+ return this;
+ }
+
+ public int remaining() {
+ return buffer.remaining();
+ }
+
+ public void put(byte[] b, int off, int rem) {
+ buffer.put(b, off, rem);
+ }
+
+ public void put(ByteBuffer byteBuffer) {
+ buffer.put(byteBuffer);
+
+ }
+
+ public int capacity(){
+ return this.buffer.capacity();
+ }
+
+ public void get(byte[] b, int off, int readSize) {
+ buffer.get(b, off, readSize);
+
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+/**
+ * A {@link ByteBuffer} stream that synchronously writes the spilled data to
+ * local storage.
+ *
+ */
+public class SyncFlushByteBufferOutputStream extends ByteBufferOutputStream {
+
+ String fileName;
+ FileChannel channel;
+ FileOutputStream stream;
+ private boolean spilled;
+
+ public SyncFlushByteBufferOutputStream(String fileName) {
+ super();
+ this.fileName = fileName;
+ }
+
+ @Override
+ protected boolean onBufferFull(byte[] b, int off, int len) throws IOException {
+ buffer.flip();
+ if (channel == null) {
+ File f = new File(fileName);
+ stream = new FileOutputStream(f, true);
+ channel = stream.getChannel();
+ }
+ channel.write(buffer);
+ channel.write(ByteBuffer.wrap(b, off, len));
+
+ channel.force(true);
+ buffer.clear();
+ spilled = true;
+ return false;
+ }
+
+ @Override
+ protected void onFlush() throws IOException {
+ if (spilled) {
+ buffer.flip();
+ channel.write(buffer);
+ channel.force(true);
+ channel.close();
+ }
+ }
+
+ public boolean isSpilled() {
+ return spilled;
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java Wed Feb 6 19:23:42 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * A {@link ByteBuffer} input stream that synchronously reads from spilled data.
+ * Uses {@link DuplexByteArrayChannel} within.
+ */
+public class SyncReadByteBufferInputStream extends ByteBufferInputStream {
+
+ private static final Log LOG = LogFactory
+ .getLog(SyncReadByteBufferInputStream.class);
+
+ private boolean spilled;
+ private FileChannel fileChannel;
+ private long fileBytesToRead;
+ private long fileBytesRead;
+ private DuplexByteArrayChannel duplexChannel = new DuplexByteArrayChannel();
+
+ public SyncReadByteBufferInputStream(boolean isSpilled, String fileName) {
+ spilled = isSpilled;
+ if (isSpilled) {
+ RandomAccessFile f;
+ try {
+ f = new RandomAccessFile(fileName, "r");
+ fileChannel = f.getChannel();
+ fileBytesToRead = fileChannel.size();
+ } catch (FileNotFoundException e) {
+ LOG.error("File not found initializing Synchronous Input Byte Stream",
+ e);
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ LOG.error("Error initializing Synchronous Input Byte Stream", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+ }
+
+ private void feedDataFromFile() throws IOException {
+ int toReadNow = (int) Math.min(buffer.capacity(), fileBytesToRead);
+ fileChannel.transferTo(fileBytesRead, toReadNow, duplexChannel);
+ fileBytesRead += toReadNow;
+ fileBytesToRead -= toReadNow;
+ duplexChannel.flip();
+ }
+
+ @Override
+ public void setBuffer(ByteBuffer buffer, long toRead, long total)
+ throws IOException {
+ this.buffer = buffer;
+ duplexChannel.setBuffer(buffer);
+ if (spilled) {
+ feedDataFromFile();
+ }
+ super.setBuffer(buffer, fileBytesToRead, fileBytesToRead);
+
+ }
+
+ @Override
+ protected int onBufferRead(byte[] b, int off, int len, int cur)
+ throws IOException {
+
+ if (fileBytesToRead == 0) {
+ return cur == 0 ? -1 : cur;
+ }
+
+ if (spilled) {
+ buffer.clear();
+ feedDataFromFile();
+ }
+ return cur += read(b, off, len);
+ }
+
+}