You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/02/06 20:23:43 UTC

svn commit: r1443152 - /hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/

Author: surajsmenon
Date: Wed Feb  6 19:23:42 2013
New Revision: 1443152

URL: http://svn.apache.org/viewvc?rev=1443152&view=rev
Log:
Fixing the build after [HAMA-723] final patch.

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferInputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * <code>ByteBufferInputStream<code> is used to read back from SpilledByteBuffer. The
+ * goal of this class to read data till a boundary record marked. The remaining
+ * data which is the partial record in the byte buffer is read into the intermediate 
+ * buffer using {@link ByteBufferInputStream#fillForNext()}. If this stream is 
+ * to be used as an intermediate buffer to read from a bigger source of data, 
+ * say a file, the function {@link ByteBufferInputStream#onBufferRead(byte[], int, int, int)}
+ * could be used to fill up the ByteBuffer encapsulated by the class.
+ * 
+ */
+public class ByteBufferInputStream extends InputStream {
+
+  private final byte[] readByte = new byte[1];
+  private int interBuffSize;
+  private byte[] interBuffer;
+  private int dataSizeInInterBuffer;
+  private int curPos;
+
+  protected ByteBuffer buffer;
+  private long toBeRead;
+  private long bytesRead;
+  private long totalBytes;
+
+  public ByteBufferInputStream() {
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+  }
+
+  public boolean hasDataToRead() {
+    return ((toBeRead - bytesRead) > 0);
+  }
+
+  public boolean hasUnmarkedData() {
+    return ((totalBytes - bytesRead) > 0);
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (dataSizeInInterBuffer > 0) {
+      --dataSizeInInterBuffer;
+      ++bytesRead;
+      return interBuffer[curPos++] & 0xFF;
+    }
+
+    if (-1 == read(readByte, 0, 1)) {
+      return -1;
+    }
+    return readByte[0] & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (dataSizeInInterBuffer >= len) {
+      // copy the count of bytes to b
+      System.arraycopy(interBuffer, curPos, b, off, len);
+      dataSizeInInterBuffer -= len;
+      curPos += len;
+      bytesRead += len;
+      return len;
+    }
+    int size = 0;
+
+    while (len > 0) {
+      if (dataSizeInInterBuffer == 0) {
+        dataSizeInInterBuffer = readInternal(interBuffer, 0, interBuffer.length);
+        curPos = 0;
+        if (dataSizeInInterBuffer <= 0) {
+          break;
+        }
+
+      }
+      int readSize = Math.min(dataSizeInInterBuffer, len);
+      System.arraycopy(interBuffer, curPos, b, off, readSize);
+      len -= readSize;
+      off += readSize;
+      size += readSize;
+      dataSizeInInterBuffer -= readSize;
+      curPos += readSize;
+    }
+    bytesRead += size;
+    return size;
+  }
+
+  public int readInternal(byte[] b, int off, int len) throws IOException {
+    if (buffer == null) {
+      return -1;
+    }
+    int cur = 0;
+    while (len > 0) {
+      int rem = buffer.remaining();
+      if (rem == 0) {
+        return onBufferRead(b, off, len, cur);
+      }
+      int readSize = Math.min(rem, len);
+      buffer.get(b, off, readSize);
+      len -= readSize;
+      off += readSize;
+      cur += readSize;
+    }
+    return cur;
+  }
+
+  /**
+   * When the byte buffer encapsulated is out of data then this function is
+   * invoked.
+   * 
+   * @param b the byte buffer to read into
+   * @param off offset index to start writing
+   * @param len length of data to be written
+   * @param cur The current size already read by the class.
+   * @return if the end of the stream has reached, and cur is 0 return -1; else
+   * return the data size currently read.
+   * @throws IOException
+   */
+  protected int onBufferRead(byte[] b, int off, int len, int cur)
+      throws IOException {
+    if (cur != 0)
+      return cur;
+    else
+      return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  /**
+   * Sets the byte buffer to read the data from.
+   * @param buffer The byte buffer to read data from
+   * @param toRead Number of bytes till end of last record.
+   * @param total Total bytes of data to read in the buffer.
+   * @throws IOException
+   */
+  public void setBuffer(ByteBuffer buffer, long toRead, long total)
+      throws IOException {
+    this.buffer = buffer;
+    toBeRead = toRead += dataSizeInInterBuffer;
+    totalBytes = total;
+    bytesRead = 0;
+    if (interBuffer == null) {
+      interBuffSize = Math.min(buffer.remaining(), 8192);
+      interBuffer = new byte[interBuffSize];
+      dataSizeInInterBuffer = 0;
+    }
+    fetchIntermediate();
+  }
+
+  private void fetchIntermediate() throws IOException {
+    int readSize = readInternal(interBuffer, dataSizeInInterBuffer,
+        interBuffer.length - dataSizeInInterBuffer);
+    if (readSize > 0) {
+      dataSizeInInterBuffer += readSize;
+    }
+    curPos = 0;
+  }
+
+  /**
+   * This function should be called to provision reading the partial records
+   * into the buffer after the last record in the buffer is read. This data 
+   * would be appended with the next ByteBuffer that is set using 
+   * {@link ByteBufferInputStream#setBuffer(ByteBuffer, long, long)} to start
+   * reading records.
+   * @throws IOException
+   */
+  public void fillForNext() throws IOException {
+
+    int remainingBytes = buffer.remaining();
+    if (curPos != 0) {
+      System.arraycopy(interBuffer, curPos, interBuffer, 0,
+          dataSizeInInterBuffer);
+    }
+    curPos = 0;
+    if (remainingBytes == 0)
+      return;
+
+    if (dataSizeInInterBuffer + remainingBytes > interBuffSize) {
+      interBuffSize = dataSizeInInterBuffer + remainingBytes;
+      byte[] arr = this.interBuffer;
+      this.interBuffer = new byte[interBuffSize];
+      System.arraycopy(arr, 0, interBuffer, 0, dataSizeInInterBuffer);
+    }
+    int readSize = readInternal(this.interBuffer, dataSizeInInterBuffer,
+        remainingBytes);
+    if (readSize > 0)
+      dataSizeInInterBuffer += readSize;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ByteBufferOutputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ByteBufferOutputStream encapsulates a byte buffer to write data into. The
+ * function {@link ByteBufferOutputStream#onBufferFull(byte[], int, int)} 
+ * should be overriden to handle the case when the size of data exceeds the size
+ * of buffer. The default behavior is to throw an exception.
+ */
+class ByteBufferOutputStream extends OutputStream {
+
+  private final byte[] b = new byte[1];
+  private byte[] interBuffer;
+  private int interBufferDataSize;
+  protected ByteBuffer buffer;
+
+  public void clear() {
+    if (this.buffer != null) {
+      this.buffer.clear();
+    }
+    interBufferDataSize = 0;
+  }
+
+  /**
+   * Sets the buffer for the stream.
+   * @param buffer byte buffer to hold within.
+   */
+  public void setBuffer(ByteBuffer buffer) {
+    this.buffer = buffer;
+    this.interBufferDataSize = 0;
+    int interSize = Math.min(buffer.capacity()/2, 8192);
+    if(interBuffer == null){
+      interBuffer = new byte[interSize];
+    }
+    
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    if (interBufferDataSize < interBuffer.length - 1) {
+      interBuffer[interBufferDataSize++] = (byte) (b & 0xFF);
+      return;
+    }
+
+    this.b[0] = (byte) (b & 0xFF);
+    write(this.b);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    if (len >= interBuffer.length) {
+      /*
+       * If the request length exceeds the size of the output buffer, flush the
+       * output buffer and then write the data directly. In this way buffered
+       * streams will cascade harmlessly.
+       */
+      flushBuffer();
+      writeInternal(b, off, len);
+      return;
+    }
+    if (len > interBuffer.length - interBufferDataSize) {
+      flushBuffer();
+    }
+    System.arraycopy(b, off, interBuffer, interBufferDataSize, len);
+    interBufferDataSize += len;
+  }
+
+  private void writeInternal(byte[] b, int off, int len) throws IOException {
+
+    if (len <= buffer.remaining() || onBufferFull(b, off, len)) {
+      buffer.put(b, off, len);
+    }
+  }
+
+  /**
+   * Action to take when the data to be written exceeds the size of the byte 
+   * buffer inside.
+   * 
+   * @return
+   * @throws IOException
+   */
+  protected boolean onBufferFull(byte[] b, int off, int len) throws IOException {
+    return true;
+  }
+
+  @Override
+  public void flush() throws IOException {
+    flushBuffer();
+    onFlush();
+  }
+
+  /**
+   * Called when the byte buffer stream is closed.
+   * @throws IOException
+   */
+  protected void onFlush() throws IOException {
+
+  }
+
+  /** Flush the internal buffer */
+  private void flushBuffer() throws IOException {
+    if (interBufferDataSize > 0) {
+      writeInternal(interBuffer, 0, interBufferDataSize);
+      interBufferDataSize = 0;
+    }
+  }
+
+  public ByteBuffer getBuffer() {
+    return this.buffer;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/CombineSpilledDataProcessor.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * This data processor adds a stage in between the spillage of data. Based on
+ * the combiner provided it combines the bunch of messages in the byte buffer
+ * and then writes them to the file writer using the base class handle method.
+ * TODO: Experiment the feature with pipelining design.
+ */
+public class CombineSpilledDataProcessor<M extends Writable> extends
+    WriteSpilledDataProcessor {
+
+  public static Log LOG = LogFactory.getLog(CombineSpilledDataProcessor.class);
+
+  Combiner<M> combiner;
+  M writableObject;
+  ReusableByteBuffer<M> iterator;
+  DirectByteBufferOutputStream combineOutputBuffer;
+  ByteBuffer combineBuffer;
+
+  public CombineSpilledDataProcessor(String fileName)
+      throws FileNotFoundException {
+    super(fileName);
+  }
+
+  @Override
+  public boolean init(Configuration conf) {
+    if (!super.init(conf)) {
+      return false;
+    }
+    String className = conf.get(Constants.COMBINER_CLASS);
+
+    if (className == null)
+      return true;
+    try {
+      combiner = ReflectionUtils.newInstance(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+    className = conf.get(Constants.MESSAGE_CLASS);
+
+    if (className != null) {
+      try {
+        writableObject = ReflectionUtils.newInstance(className);
+        iterator = new ReusableByteBuffer<M>(writableObject);
+      } catch (ClassNotFoundException e) {
+        LOG.error("Error combining the records.", e);
+        return false;
+      }
+    }
+
+    combineOutputBuffer = new DirectByteBufferOutputStream();
+    combineBuffer = ByteBuffer.allocateDirect(Constants.BUFFER_DEFAULT_SIZE);
+    combineOutputBuffer.setBuffer(combineBuffer);
+    return true;
+  }
+
+  @Override
+  public boolean handleSpilledBuffer(SpilledByteBuffer buffer) {
+
+    if (combiner == null || writableObject == null) {
+      return super.handleSpilledBuffer(buffer);
+    }
+
+    try {
+      iterator.set(buffer);
+    } catch (IOException e1) {
+      LOG.error("Error setting buffer for combining data", e1);
+      return false;
+    }
+    Writable combinedMessage = combiner.combine(iterator);
+    try {
+      iterator.prepareForNext();
+    } catch (IOException e1) {
+      LOG.error("Error preparing for next buffer.", e1);
+      return false;
+    }
+    try {
+      combinedMessage.write(this.combineOutputBuffer);
+    } catch (IOException e) {
+      LOG.error("Error writing the combiner output.", e);
+      return false;
+    }
+    try {
+      this.combineOutputBuffer.flush();
+    } catch (IOException e) {
+      LOG.error("Error flushing the combiner output.", e);
+      return false;
+    }
+    this.combineOutputBuffer.getBuffer().flip();
+    try {
+      return super.handleSpilledBuffer(new SpilledByteBuffer(
+          this.combineOutputBuffer.getBuffer(), this.combineOutputBuffer
+          .getBuffer().remaining()));
+    } finally {
+      this.combineOutputBuffer.clear();
+    }
+  }
+
+  @Override
+  public boolean close() {
+    return true;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferInputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Encapsulates {@link ByteBufferInputStream}
+ */
+public class DirectByteBufferInputStream extends DataInputStream implements
+    DataInput {
+
+  public DirectByteBufferInputStream(ByteBufferInputStream in) {
+    super(in);
+  }
+
+  public DirectByteBufferInputStream() {
+    super(new ByteBufferInputStream());
+  }
+
+  public void prepareForNext() throws IOException {
+    ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+    stream.fillForNext();
+  }
+
+  public boolean hasDataToRead() {
+    ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+    return stream.hasDataToRead();
+  }
+
+  public boolean hasUnmarkData() {
+    ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+    return stream.hasUnmarkedData();
+  }
+
+  public void setBuffer(SpilledByteBuffer buff) throws IOException {
+    ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+    stream.setBuffer(buff.getByteBuffer(), buff.getMarkofLastRecord(),
+        buff.remaining());
+  }
+
+  public void setBuffer(ByteBuffer buffer) throws IOException {
+    ByteBufferInputStream stream = (ByteBufferInputStream) this.in;
+    stream.setBuffer(buffer, buffer.remaining(), buffer.remaining());
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DirectByteBufferOutputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Encapsulates a {@link DirectByteBufferOutputStream}.
+ *
+ */
+public class DirectByteBufferOutputStream extends DataOutputStream {
+
+  public DirectByteBufferOutputStream() {
+    super(new ByteBufferOutputStream());
+  }
+  
+  public DirectByteBufferOutputStream(ByteBufferOutputStream stream){
+    super(stream);
+  }
+
+  public ByteBuffer getBuffer() {
+    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+    return stream.getBuffer();
+  }
+
+  public void setBuffer(ByteBuffer buffer) {
+    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+    stream.setBuffer(buffer);
+  }
+
+  public void clear(){
+    ByteBufferOutputStream stream = (ByteBufferOutputStream)this.out;
+    stream.clear();
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DualChannelByteBufferStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.security.SecureRandom;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.Constants;
+
+/**
+ * A synchronous i/o stream that is used to write data into and to read back the 
+ * written data.
+ */
+public class DualChannelByteBufferStream {
+
+  private DirectByteBufferOutputStream outputBuffer;
+  private SyncFlushByteBufferOutputStream outputStream;
+  private DirectByteBufferInputStream inputBuffer;
+  private SyncReadByteBufferInputStream inputStream;
+
+  private String fileName;
+
+  private ByteBuffer buffer;
+  private boolean outputMode;
+  private boolean inputMode;
+
+  public void init(Configuration conf) {
+
+    boolean directAlloc = conf.getBoolean(Constants.BYTEBUFFER_DIRECT,
+        Constants.BYTEBUFFER_DIRECT_DEFAULT);
+    int size = conf.getInt(Constants.BYTEBUFFER_SIZE,
+        Constants.BUFFER_DEFAULT_SIZE);
+    if (directAlloc) {
+      buffer = ByteBuffer.allocateDirect(size);
+    } else {
+      buffer = ByteBuffer.allocateDirect(size);
+    }
+    fileName = conf.get(Constants.DATA_SPILL_PATH) + File.separatorChar
+        + new BigInteger(128, new SecureRandom()).toString(32);
+    outputMode = true;
+    outputStream = new SyncFlushByteBufferOutputStream(fileName);
+    outputBuffer = new DirectByteBufferOutputStream(outputStream);
+    outputStream.setBuffer(buffer);
+
+  }
+
+  public DirectByteBufferOutputStream getOutputStream() {
+    return outputBuffer;
+  }
+
+  public void closeOutput() throws IOException {
+    if (outputMode) {
+      outputBuffer.close();
+    }
+    outputMode = false;
+  }
+  
+  public void close() throws IOException{
+    closeInput();
+    closeOutput();
+  }
+
+  public boolean prepareRead() throws IOException {
+    outputStream.close();
+    outputMode = false;
+    buffer.clear();
+    inputStream = new SyncReadByteBufferInputStream(outputStream.isSpilled(),
+        fileName);
+    inputBuffer = new DirectByteBufferInputStream(inputStream);
+    inputBuffer.setBuffer(buffer);
+    inputMode = true;
+    return true;
+  }
+
+  public DirectByteBufferInputStream getInputStream() {
+    return inputBuffer;
+  }
+
+  public void closeInput() throws IOException {
+    if (inputMode) {
+      inputBuffer.close();
+    }
+    inputMode = false;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/DuplexByteArrayChannel.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,78 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * A utility class to just hold the byte buffer to read back the written data
+ * or write data to read back. Buffer overflow, underflow conditions are not
+ * enforced.
+ *
+ */
+public class DuplexByteArrayChannel implements WritableByteChannel,
+    ReadableByteChannel {
+
+  private boolean open;
+  private ByteBuffer buffer;
+  
+  DuplexByteArrayChannel(){
+    
+  }
+
+  @Override
+  public boolean isOpen() {
+    return open;
+  }
+
+  @Override
+  public void close() throws IOException {
+    open = false;
+  }
+
+  @Override
+  public int read(ByteBuffer dst) throws IOException {
+    int num = dst.remaining();
+    dst.put(buffer);
+    return num - dst.remaining();
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int num = buffer.remaining();
+    buffer.put(src);
+    return num - buffer.remaining();
+  }
+
+  public void setBuffer(ByteBuffer buffer) {
+    this.buffer = buffer;
+    open = true;
+  }
+
+  public void flip() {
+    buffer.flip();
+  }
+  
+  public ByteBuffer getBuffer(){
+    return buffer;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/ReusableByteBuffer.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * 
+ * 
+ * @param <M>
+ */
+public class ReusableByteBuffer<M extends Writable> implements Iterable<M> {
+
+  private DirectByteBufferInputStream stream;
+  private SpilledByteBuffer buffer;
+  private boolean isIterStarted;
+
+  private M message;
+
+  private static class ReusableByteBufferIterator<M extends Writable>
+      implements Iterator<M> {
+
+    private ReusableByteBuffer<M> buffer;
+    private M message;
+
+    public ReusableByteBufferIterator(ReusableByteBuffer<M> bbuffer, M msg) {
+      this.buffer = bbuffer;
+      this.message = msg;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (!buffer.isIterStarted) {
+        throw new IllegalStateException(
+            "Iterator should be reinitialized to work with new buffer.");
+      }
+      return buffer.stream.hasDataToRead();
+    }
+
+    @Override
+    public M next() {
+      if (!buffer.isIterStarted) {
+        throw new IllegalStateException(
+            "Iterator should be reinitialized to work with new buffer.");
+      }
+      try {
+        message.readFields(this.buffer.stream);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      return message;
+    }
+
+    @Override
+    public void remove() {
+    }
+  }
+
+  public ReusableByteBuffer(M reusableObject) {
+    stream = new DirectByteBufferInputStream();
+    message = reusableObject;
+  }
+
+  public void set(SpilledByteBuffer buffer) throws IOException {
+    this.buffer = buffer;
+    stream.setBuffer(this.buffer);
+    isIterStarted = false;
+  }
+
+  public void setReusableObject(M object) {
+    this.message = object;
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    if (isIterStarted) {
+      throw new UnsupportedOperationException(
+          "Only one iterator creation is allowed.");
+    }
+    isIterStarted = true;
+    return new ReusableByteBufferIterator<M>(this, message);
+  }
+
+  public void prepareForNext() throws IOException {
+    this.stream.prepareForNext();
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpilledByteBuffer.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.DoubleBuffer;
+import java.nio.FloatBuffer;
+import java.nio.IntBuffer;
+import java.nio.LongBuffer;
+import java.nio.ShortBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * <code>SpilledByteBuffer</code> encapsulates a ByteBuffer. It lets the user to
+ * mark the end of last record written into the buffer.
+ * 
+ */
+public class SpilledByteBuffer {
+
+  ByteBuffer buffer;
+  int endOfRecord;
+  Class<? extends Writable> writableClass;
+
+  public SpilledByteBuffer(boolean direct, int size) {
+    if (direct) {
+      buffer = ByteBuffer.allocate(size);
+    } else {
+      buffer = ByteBuffer.allocateDirect(size);
+    }
+  }
+
+  public void setRecordClass(Class<? extends Writable> classObj) {
+    this.writableClass = classObj;
+  }
+  
+  public Class<? extends Writable> getRecordClass(){
+    return this.writableClass;
+  }
+
+  public SpilledByteBuffer(ByteBuffer byteBuffer) {
+    this.buffer = byteBuffer;
+  }
+
+  public SpilledByteBuffer(ByteBuffer byteBuffer, int markEnd) {
+    this.buffer = byteBuffer;
+    this.endOfRecord = markEnd;
+  }
+
+  public void markEndOfRecord() {
+    this.endOfRecord = this.buffer.position();
+  }
+  
+  public void markEndOfRecord(int pos){
+    if(pos < this.buffer.capacity()){
+      this.endOfRecord = pos;
+    }
+  }
+  
+  public int getMarkofLastRecord(){
+    return this.endOfRecord;
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return buffer;
+  }
+
+  public CharBuffer asCharBuffer() {
+    return buffer.asCharBuffer();
+  }
+
+  public DoubleBuffer asDoubleBuffer() {
+    return buffer.asDoubleBuffer();
+  }
+
+  public FloatBuffer asFloatBuffer() {
+    return buffer.asFloatBuffer();
+  }
+
+  public IntBuffer asIntBuffer() {
+    return buffer.asIntBuffer();
+  }
+
+  public LongBuffer asLongBuffer() {
+    return buffer.asLongBuffer();
+  }
+
+  public SpilledByteBuffer asReadOnlyBuffer() {
+    return new SpilledByteBuffer(buffer.asReadOnlyBuffer());
+  }
+
+  public ShortBuffer asShortBuffer() {
+    return buffer.asShortBuffer();
+  }
+
+  public SpilledByteBuffer compact() {
+    buffer.compact();
+    return this;
+  }
+
+  public SpilledByteBuffer duplicate() {
+    buffer.duplicate();
+    return new SpilledByteBuffer(this.buffer, this.endOfRecord);
+  }
+
+  public byte get() {
+    return buffer.get();
+  }
+
+  public byte get(int index) {
+    return buffer.get(index);
+  }
+
+  public char getChar() {
+    return buffer.getChar();
+  }
+
+  public char getChar(int index) {
+    return buffer.getChar(index);
+  }
+
+  public double getDouble() {
+    return buffer.getDouble();
+  }
+
+  public double getDouble(int index) {
+    return buffer.getDouble(index);
+  }
+
+  public float getFloat() {
+    return buffer.getFloat();
+  }
+
+  public float getFloat(int index) {
+    return buffer.getFloat(index);
+  }
+
+  public int getInt() {
+    return buffer.getInt();
+  }
+
+  public int getInt(int index) {
+    return buffer.getInt(index);
+  }
+
+  public long getLong() {
+    return buffer.getLong();
+  }
+
+  public long getLong(int index) {
+    return buffer.getLong();
+  }
+
+  public short getShort() {
+    return buffer.getShort();
+  }
+
+  public short getShort(int index) {
+    return buffer.getShort(index);
+  }
+
+  public SpilledByteBuffer put(byte b) {
+    buffer.put(b);
+    return this;
+  }
+
+  public SpilledByteBuffer put(int index, byte b) {
+    buffer.put(index, b);
+    return this;
+  }
+
+  public SpilledByteBuffer putChar(char value) {
+    buffer.putChar(value);
+    return this;
+  }
+
+  public SpilledByteBuffer putChar(int index, char value) {
+    buffer.putChar(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putDouble(double value) {
+    buffer.putDouble(value);
+    return this;
+  }
+
+  public SpilledByteBuffer putDouble(int index, double value) {
+    buffer.putDouble(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putFloat(float value) {
+    buffer.putFloat(value);
+    return this;
+  }
+
+  public SpilledByteBuffer putFloat(int index, float value) {
+    buffer.putFloat(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putInt(int index, int value) {
+    buffer.putInt(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putInt(int value) {
+    buffer.putInt(value);
+    return this;
+  }
+
+  public SpilledByteBuffer putLong(int index, long value) {
+    buffer.putLong(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putLong(long value) {
+    buffer.putLong(value);
+    return this;
+  }
+
+  public SpilledByteBuffer putShort(int index, short value) {
+    buffer.putShort(index, value);
+    return this;
+  }
+
+  public SpilledByteBuffer putShort(short value) {
+    buffer.putShort(value);
+    return this;
+  }
+
+  public SpilledByteBuffer slice() {
+    return new SpilledByteBuffer(buffer.slice());
+  }
+
+  public byte[] array() {
+    return buffer.array();
+  }
+
+  public int arrayOffset() {
+    return buffer.arrayOffset();
+  }
+
+  public boolean hasArray() {
+    return buffer.hasArray();
+  }
+
+  public boolean isDirect() {
+    return buffer.isDirect();
+  }
+
+  public boolean isReadOnly() {
+    return buffer.isReadOnly();
+  }
+
+  public void clear() {
+    buffer.clear();
+  }
+
+  public SpilledByteBuffer flip() {
+    buffer.flip();
+    return this;
+  }
+
+  public int remaining() {
+    return buffer.remaining();
+  }
+
+  public void put(byte[] b, int off, int rem) {
+    buffer.put(b, off, rem);
+  }
+
+  public void put(ByteBuffer byteBuffer) {
+    buffer.put(byteBuffer);
+
+  }
+  
+  public int capacity(){
+    return this.buffer.capacity();
+  }
+
+  public void get(byte[] b, int off, int readSize) {
+    buffer.get(b, off, readSize);
+
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncFlushByteBufferOutputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+/**
+ * A {@link ByteBuffer} stream that synchronously writes the spilled data to 
+ * local storage.
+ *
+ */
+public class SyncFlushByteBufferOutputStream extends ByteBufferOutputStream {
+
+  String fileName;
+  FileChannel channel;
+  FileOutputStream stream;
+  private boolean spilled;
+
+  public SyncFlushByteBufferOutputStream(String fileName) {
+    super();
+    this.fileName = fileName;
+  }
+
+  @Override
+  protected boolean onBufferFull(byte[] b, int off, int len) throws IOException {
+    buffer.flip();
+    if (channel == null) {
+      File f = new File(fileName);
+      stream = new FileOutputStream(f, true);
+      channel = stream.getChannel();
+    }
+    channel.write(buffer);
+    channel.write(ByteBuffer.wrap(b, off, len));
+
+    channel.force(true);
+    buffer.clear();
+    spilled = true;
+    return false;
+  }
+
+  @Override
+  protected void onFlush() throws IOException {
+    if (spilled) {
+      buffer.flip();
+      channel.write(buffer);
+      channel.force(true);
+      channel.close();
+    }
+  }
+
+  public boolean isSpilled() {
+    return spilled;
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java?rev=1443152&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SyncReadByteBufferInputStream.java Wed Feb  6 19:23:42 2013
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.io;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+/**
+ * A {@link ByteBuffer} input stream that synchronously reads from spilled data.
+ * Uses {@link DuplexByteArrayChannel} within.
+ */
+public class SyncReadByteBufferInputStream extends ByteBufferInputStream {
+
+  private static final Log LOG = LogFactory
+      .getLog(SyncReadByteBufferInputStream.class);
+
+  private boolean spilled;
+  private FileChannel fileChannel;
+  private long fileBytesToRead;
+  private long fileBytesRead;
+  private DuplexByteArrayChannel duplexChannel = new DuplexByteArrayChannel();
+
+  public SyncReadByteBufferInputStream(boolean isSpilled, String fileName) {
+    spilled = isSpilled;
+    if (isSpilled) {
+      RandomAccessFile f;
+      try {
+        f = new RandomAccessFile(fileName, "r");
+        fileChannel = f.getChannel();
+        fileBytesToRead = fileChannel.size();
+      } catch (FileNotFoundException e) {
+        LOG.error("File not found initializing Synchronous Input Byte Stream",
+            e);
+        throw new RuntimeException(e);
+      } catch (IOException e) {
+        LOG.error("Error initializing Synchronous Input Byte Stream", e);
+        throw new RuntimeException(e);
+      }
+
+    }
+  }
+
+  private void feedDataFromFile() throws IOException {
+    int toReadNow = (int) Math.min(buffer.capacity(), fileBytesToRead);
+    fileChannel.transferTo(fileBytesRead, toReadNow, duplexChannel);
+    fileBytesRead += toReadNow;
+    fileBytesToRead -= toReadNow;
+    duplexChannel.flip();
+  }
+
+  @Override
+  public void setBuffer(ByteBuffer buffer, long toRead, long total)
+      throws IOException {
+    this.buffer = buffer;
+    duplexChannel.setBuffer(buffer);
+    if (spilled) {
+      feedDataFromFile();
+    }
+    super.setBuffer(buffer, fileBytesToRead, fileBytesToRead);
+
+  }
+
+  @Override
+  protected int onBufferRead(byte[] b, int off, int len, int cur)
+      throws IOException {
+
+    if (fileBytesToRead == 0) {
+      return cur == 0 ? -1 : cur;
+    }
+
+    if (spilled) {
+      buffer.clear();
+      feedDataFromFile();
+    }
+    return cur += read(b, off, len);
+  }
+
+}