You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/09/18 06:15:12 UTC

spark git commit: [SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …

Repository: spark
Updated Branches:
  refs/heads/master 7c7266208 -> 1e978b17d


[SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …

Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O.

## How was this patch tested?

Tested by running a job on the cluster and could see up to 8% CPU improvement.

Author: Sital Kedia <sk...@fb.com>
Author: Shixiong Zhu <zs...@gmail.com>
Author: Sital Kedia <si...@users.noreply.github.com>

Closes #18317 from sitalkedia/read_ahead_buffer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e978b17
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e978b17
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e978b17

Branch: refs/heads/master
Commit: 1e978b17d63d7ba20368057aa4e65f5ef6e87369
Parents: 7c72662
Author: Sital Kedia <sk...@fb.com>
Authored: Sun Sep 17 23:15:08 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Sun Sep 17 23:15:08 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/io/ReadAheadInputStream.java   | 408 +++++++++++++++++++
 .../unsafe/sort/UnsafeSorterSpillReader.java    |  21 +-
 .../spark/io/GenericFileInputStreamSuite.java   | 130 ++++++
 .../io/NioBufferedFileInputStreamSuite.java     | 135 ------
 .../spark/io/NioBufferedInputStreamSuite.java   |  33 ++
 .../spark/io/ReadAheadInputStreamSuite.java     |  33 ++
 6 files changed, 621 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
new file mode 100644
index 0000000..618bd42
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.apache.spark.util.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead from the underlying input
+ * stream when specified amount of data has been read from the current buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private static final Logger logger = LoggerFactory.getLogger(ReadAheadInputStream.class);
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Throwable readException;
+
+  @GuardedBy("stateChangeLock")
+  // whether the close method is called.
+  private boolean isClosed;
+
+  @GuardedBy("stateChangeLock")
+  // true when the close method will close the underlying input stream. This is valid only if
+  // `isClosed` is true.
+  private boolean isUnderlyingInputStreamBeingClosed;
+
+  @GuardedBy("stateChangeLock")
+  // whether there is a read ahead task running,
+  private boolean isReading;
+
+  // If the remaining data size in the current buffer is below this threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = stateChangeLock.newCondition();
+
+  private static final ThreadLocal<byte[]> oneByte = ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a <code>ReadAheadInputStream</code> with the specified buffer size and read-ahead
+   * threshold
+   *
+   * @param inputStream The underlying input stream.
+   * @param bufferSizeInBytes The buffer size.
+   * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead
+   *                                  threshold, an async read is triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) {
+    Preconditions.checkArgument(bufferSizeInBytes > 0,
+        "bufferSizeInBytes should be greater than 0, but the value is " + bufferSizeInBytes);
+    Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+            readAheadThresholdInBytes < bufferSizeInBytes,
+        "readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes, but the" +
+            "value is " + readAheadThresholdInBytes);
+    activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+    readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+    this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+    this.underlyingInputStream = inputStream;
+    activeBuffer.flip();
+    readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+    return (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream);
+  }
+
+  private void checkReadException() throws IOException {
+    if (readAborted) {
+      Throwables.propagateIfPossible(readException, IOException.class);
+      throw new IOException(readException);
+    }
+  }
+
+  /** Read data from underlyingInputStream to readAheadBuffer asynchronously. */
+  private void readAsync() throws IOException {
+    stateChangeLock.lock();
+    final byte[] arr = readAheadBuffer.array();
+    try {
+      if (endOfStream || readInProgress) {
+        return;
+      }
+      checkReadException();
+      readAheadBuffer.position(0);
+      readAheadBuffer.flip();
+      readInProgress = true;
+    } finally {
+      stateChangeLock.unlock();
+    }
+    executorService.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        stateChangeLock.lock();
+        try {
+          if (isClosed) {
+            readInProgress = false;
+            return;
+          }
+          // Flip this so that the close method will not close the underlying input stream when we
+          // are reading.
+          isReading = true;
+        } finally {
+          stateChangeLock.unlock();
+        }
+
+        // Please note that it is safe to release the lock and read into the read ahead buffer
+        // because either of following two conditions will hold - 1. The active buffer has
+        // data available to read so the reader will not read from the read ahead buffer.
+        // 2. This is the first time read is called or the active buffer is exhausted,
+        // in that case the reader waits for this async read to complete.
+        // So there is no race condition in both the situations.
+        int read = 0;
+        Throwable exception = null;
+        try {
+          while (true) {
+            read = underlyingInputStream.read(arr);
+            if (0 != read) break;
+          }
+        } catch (Throwable ex) {
+          exception = ex;
+          if (ex instanceof Error) {
+            // `readException` may not be reported to the user. Rethrow Error to make sure at least
+            // The user can see Error in UncaughtExceptionHandler.
+            throw (Error) ex;
+          }
+        } finally {
+          stateChangeLock.lock();
+          if (read < 0 || (exception instanceof EOFException)) {
+            endOfStream = true;
+          } else if (exception != null) {
+            readAborted = true;
+            readException = exception;
+          } else {
+            readAheadBuffer.limit(read);
+          }
+          readInProgress = false;
+          signalAsyncReadComplete();
+          stateChangeLock.unlock();
+          closeUnderlyingInputStreamIfNecessary();
+        }
+      }
+    });
+  }
+
+  private void closeUnderlyingInputStreamIfNecessary() {
+    boolean needToCloseUnderlyingInputStream = false;
+    stateChangeLock.lock();
+    try {
+      isReading = false;
+      if (isClosed && !isUnderlyingInputStreamBeingClosed) {
+        // close method cannot close underlyingInputStream because we were reading.
+        needToCloseUnderlyingInputStream = true;
+      }
+    } finally {
+      stateChangeLock.unlock();
+    }
+    if (needToCloseUnderlyingInputStream) {
+      try {
+        underlyingInputStream.close();
+      } catch (IOException e) {
+        logger.warn(e.getMessage(), e);
+      }
+    }
+  }
+
+  private void signalAsyncReadComplete() {
+    stateChangeLock.lock();
+    try {
+      asyncReadComplete.signalAll();
+    } finally {
+      stateChangeLock.unlock();
+    }
+  }
+
+  private void waitForAsyncReadComplete() throws IOException {
+    stateChangeLock.lock();
+    try {
+      while (readInProgress) {
+        asyncReadComplete.await();
+      }
+    } catch (InterruptedException e) {
+      InterruptedIOException iio = new InterruptedIOException(e.getMessage());
+      iio.initCause(e);
+      throw iio;
+    } finally {
+      stateChangeLock.unlock();
+    }
+    checkReadException();
+  }
+
+  @Override
+  public int read() throws IOException {
+    byte[] oneByteArray = oneByte.get();
+    return read(oneByteArray, 0, 1) == -1 ? -1 : oneByteArray[0] & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] b, int offset, int len) throws IOException {
+    if (offset < 0 || len < 0 || len > b.length - offset) {
+      throw new IndexOutOfBoundsException();
+    }
+    if (len == 0) {
+      return 0;
+    }
+    stateChangeLock.lock();
+    try {
+      return readInternal(b, offset, len);
+    } finally {
+      stateChangeLock.unlock();
+    }
+  }
+
+  /**
+   * flip the active and read ahead buffer
+   */
+  private void swapBuffers() {
+    ByteBuffer temp = activeBuffer;
+    activeBuffer = readAheadBuffer;
+    readAheadBuffer = temp;
+  }
+
+  /**
+   * Internal read function which should be called only from read() api. The assumption is that
+   * the stateChangeLock is already acquired in the caller before calling this function.
+   */
+  private int readInternal(byte[] b, int offset, int len) throws IOException {
+    assert (stateChangeLock.isLocked());
+    if (!activeBuffer.hasRemaining()) {
+      waitForAsyncReadComplete();
+      if (readAheadBuffer.hasRemaining()) {
+        swapBuffers();
+      } else {
+        // The first read or activeBuffer is skipped.
+        readAsync();
+        waitForAsyncReadComplete();
+        if (isEndOfStream()) {
+          return -1;
+        }
+        swapBuffers();
+      }
+    } else {
+      checkReadException();
+    }
+    len = Math.min(len, activeBuffer.remaining());
+    activeBuffer.get(b, offset, len);
+
+    if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) {
+      readAsync();
+    }
+    return len;
+  }
+
+  @Override
+  public int available() throws IOException {
+    stateChangeLock.lock();
+    // Make sure we have no integer overflow.
+    try {
+      return (int) Math.min((long) Integer.MAX_VALUE,
+          (long) activeBuffer.remaining() + readAheadBuffer.remaining());
+    } finally {
+      stateChangeLock.unlock();
+    }
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    if (n <= 0L) {
+      return 0L;
+    }
+    stateChangeLock.lock();
+    long skipped;
+    try {
+      skipped = skipInternal(n);
+    } finally {
+      stateChangeLock.unlock();
+    }
+    return skipped;
+  }
+
+  /**
+   * Internal skip function which should be called only from skip() api. The assumption is that
+   * the stateChangeLock is already acquired in the caller before calling this function.
+   */
+  private long skipInternal(long n) throws IOException {
+    assert (stateChangeLock.isLocked());
+    waitForAsyncReadComplete();
+    if (isEndOfStream()) {
+      return 0;
+    }
+    if (available() >= n) {
+      // we can skip from the internal buffers
+      int toSkip = (int) n;
+      if (toSkip <= activeBuffer.remaining()) {
+        // Only skipping from active buffer is sufficient
+        activeBuffer.position(toSkip + activeBuffer.position());
+        if (activeBuffer.remaining() <= readAheadThresholdInBytes
+            && !readAheadBuffer.hasRemaining()) {
+          readAsync();
+        }
+        return n;
+      }
+      // We need to skip from both active buffer and read ahead buffer
+      toSkip -= activeBuffer.remaining();
+      activeBuffer.position(0);
+      activeBuffer.flip();
+      readAheadBuffer.position(toSkip + readAheadBuffer.position());
+      swapBuffers();
+      readAsync();
+      return n;
+    } else {
+      int skippedBytes = available();
+      long toSkip = n - skippedBytes;
+      activeBuffer.position(0);
+      activeBuffer.flip();
+      readAheadBuffer.position(0);
+      readAheadBuffer.flip();
+      long skippedFromInputStream = underlyingInputStream.skip(toSkip);
+      readAsync();
+      return skippedBytes + skippedFromInputStream;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    boolean isSafeToCloseUnderlyingInputStream = false;
+    stateChangeLock.lock();
+    try {
+      if (isClosed) {
+        return;
+      }
+      isClosed = true;
+      if (!isReading) {
+        // Nobody is reading, so we can close the underlying input stream in this method.
+        isSafeToCloseUnderlyingInputStream = true;
+        // Flip this to make sure the read ahead task will not close the underlying input stream.
+        isUnderlyingInputStreamBeingClosed = true;
+      }
+    } finally {
+      stateChangeLock.unlock();
+    }
+
+    try {
+      executorService.shutdownNow();
+      executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      InterruptedIOException iio = new InterruptedIOException(e.getMessage());
+      iio.initCause(e);
+      throw iio;
+    } finally {
+      if (isSafeToCloseUnderlyingInputStream) {
+        underlyingInputStream.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 9521ab8..1e760b0 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -17,20 +17,20 @@
 
 package org.apache.spark.util.collection.unsafe.sort;
 
-import java.io.*;
-
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
-
 import org.apache.spark.SparkEnv;
 import org.apache.spark.TaskContext;
 import org.apache.spark.io.NioBufferedFileInputStream;
+import org.apache.spark.io.ReadAheadInputStream;
 import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.storage.BlockId;
 import org.apache.spark.unsafe.Platform;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.*;
+
 /**
  * Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
  * of the file format).
@@ -72,10 +72,23 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
       bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
     }
 
+    final double readAheadFraction =
+        SparkEnv.get() == null ? 0.5 :
+             SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5);
+
+    final boolean readAheadEnabled =
+            SparkEnv.get() == null ? false :
+                    SparkEnv.get().conf().getBoolean("spark.unsafe.sorter.spill.read.ahead.enabled", true);
+
     final InputStream bs =
         new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
     try {
-      this.in = serializerManager.wrapStream(blockId, bs);
+      if (readAheadEnabled) {
+        this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
+                (int) bufferSizeBytes, (int) (bufferSizeBytes * readAheadFraction));
+      } else {
+        this.in = serializerManager.wrapStream(blockId, bs);
+      }
       this.din = new DataInputStream(this.in);
       numRecords = numRecordsRemaining = din.readInt();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
new file mode 100644
index 0000000..3440e1a
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests functionality of {@link NioBufferedFileInputStream}
+ */
+public abstract class GenericFileInputStreamSuite {
+
+  private byte[] randomBytes;
+
+  protected File inputFile;
+
+  protected InputStream inputStream;
+
+  @Before
+  public void setUp() throws IOException {
+    // Create a byte array of size 2 MB with random bytes
+    randomBytes =  RandomUtils.nextBytes(2 * 1024 * 1024);
+    inputFile = File.createTempFile("temp-file", ".tmp");
+    FileUtils.writeByteArrayToFile(inputFile, randomBytes);
+  }
+
+  @After
+  public void tearDown() {
+    inputFile.delete();
+  }
+
+  @Test
+  public void testReadOneByte() throws IOException {
+    for (int i = 0; i < randomBytes.length; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+  }
+
+  @Test
+  public void testReadMultipleBytes() throws IOException {
+    byte[] readBytes = new byte[8 * 1024];
+    int i = 0;
+    while (i < randomBytes.length) {
+      int read = inputStream.read(readBytes, 0, 8 * 1024);
+      for (int j = 0; j < read; j++) {
+        assertEquals(randomBytes[i], readBytes[j]);
+        i++;
+      }
+    }
+  }
+
+  @Test
+  public void testBytesSkipped() throws IOException {
+    assertEquals(1024, inputStream.skip(1024));
+    for (int i = 1024; i < randomBytes.length; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+  }
+
+  @Test
+  public void testBytesSkippedAfterRead() throws IOException {
+    for (int i = 0; i < 1024; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+    assertEquals(1024, inputStream.skip(1024));
+    for (int i = 2048; i < randomBytes.length; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+  }
+
+  @Test
+  public void testNegativeBytesSkippedAfterRead() throws IOException {
+    for (int i = 0; i < 1024; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+    // Skipping negative bytes should essential be a no-op
+    assertEquals(0, inputStream.skip(-1));
+    assertEquals(0, inputStream.skip(-1024));
+    assertEquals(0, inputStream.skip(Long.MIN_VALUE));
+    assertEquals(1024, inputStream.skip(1024));
+    for (int i = 2048; i < randomBytes.length; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+  }
+
+  @Test
+  public void testSkipFromFileChannel() throws IOException {
+    // Since the buffer is smaller than the skipped bytes, this will guarantee
+    // we skip from underlying file channel.
+    assertEquals(1024, inputStream.skip(1024));
+    for (int i = 1024; i < 2048; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+    assertEquals(256, inputStream.skip(256));
+    assertEquals(256, inputStream.skip(256));
+    assertEquals(512, inputStream.skip(512));
+    for (int i = 3072; i < randomBytes.length; i++) {
+      assertEquals(randomBytes[i], (byte) inputStream.read());
+    }
+  }
+
+  @Test
+  public void testBytesSkippedAfterEOF() throws IOException {
+    assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1));
+    assertEquals(-1, inputStream.read());
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java
deleted file mode 100644
index 2c1a34a..0000000
--- a/core/src/test/java/org/apache/spark/io/NioBufferedFileInputStreamSuite.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.io;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests functionality of {@link NioBufferedFileInputStream}
- */
-public class NioBufferedFileInputStreamSuite {
-
-  private byte[] randomBytes;
-
-  private File inputFile;
-
-  @Before
-  public void setUp() throws IOException {
-    // Create a byte array of size 2 MB with random bytes
-    randomBytes =  RandomUtils.nextBytes(2 * 1024 * 1024);
-    inputFile = File.createTempFile("temp-file", ".tmp");
-    FileUtils.writeByteArrayToFile(inputFile, randomBytes);
-  }
-
-  @After
-  public void tearDown() {
-    inputFile.delete();
-  }
-
-  @Test
-  public void testReadOneByte() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    for (int i = 0; i < randomBytes.length; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-  }
-
-  @Test
-  public void testReadMultipleBytes() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    byte[] readBytes = new byte[8 * 1024];
-    int i = 0;
-    while (i < randomBytes.length) {
-      int read = inputStream.read(readBytes, 0, 8 * 1024);
-      for (int j = 0; j < read; j++) {
-        assertEquals(randomBytes[i], readBytes[j]);
-        i++;
-      }
-    }
-  }
-
-  @Test
-  public void testBytesSkipped() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    assertEquals(1024, inputStream.skip(1024));
-    for (int i = 1024; i < randomBytes.length; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-  }
-
-  @Test
-  public void testBytesSkippedAfterRead() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    for (int i = 0; i < 1024; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-    assertEquals(1024, inputStream.skip(1024));
-    for (int i = 2048; i < randomBytes.length; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-  }
-
-  @Test
-  public void testNegativeBytesSkippedAfterRead() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    for (int i = 0; i < 1024; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-    // Skipping negative bytes should essential be a no-op
-    assertEquals(0, inputStream.skip(-1));
-    assertEquals(0, inputStream.skip(-1024));
-    assertEquals(0, inputStream.skip(Long.MIN_VALUE));
-    assertEquals(1024, inputStream.skip(1024));
-    for (int i = 2048; i < randomBytes.length; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-  }
-
-  @Test
-  public void testSkipFromFileChannel() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile, 10);
-    // Since the buffer is smaller than the skipped bytes, this will guarantee
-    // we skip from underlying file channel.
-    assertEquals(1024, inputStream.skip(1024));
-    for (int i = 1024; i < 2048; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-    assertEquals(256, inputStream.skip(256));
-    assertEquals(256, inputStream.skip(256));
-    assertEquals(512, inputStream.skip(512));
-    for (int i = 3072; i < randomBytes.length; i++) {
-      assertEquals(randomBytes[i], (byte) inputStream.read());
-    }
-  }
-
-  @Test
-  public void testBytesSkippedAfterEOF() throws IOException {
-    InputStream inputStream = new NioBufferedFileInputStream(inputFile);
-    assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1));
-    assertEquals(-1, inputStream.read());
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java
new file mode 100644
index 0000000..211b33a
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/io/NioBufferedInputStreamSuite.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.junit.Before;
+
+import java.io.IOException;
+
+/**
+ * Tests functionality of {@link NioBufferedFileInputStream}
+ */
+public class NioBufferedInputStreamSuite extends GenericFileInputStreamSuite {
+
+  @Before
+  public void setUp() throws IOException {
+    super.setUp();
+    inputStream = new NioBufferedFileInputStream(inputFile);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1e978b17/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java
new file mode 100644
index 0000000..5008f93
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/io/ReadAheadInputStreamSuite.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import org.junit.Before;
+
+import java.io.IOException;
+
+/**
+ * Tests functionality of {@link NioBufferedFileInputStream}
+ */
+public class ReadAheadInputStreamSuite extends GenericFileInputStreamSuite {
+
+  @Before
+  public void setUp() throws IOException {
+    super.setUp();
+    inputStream = new ReadAheadInputStream(new NioBufferedFileInputStream(inputFile), 8 * 1024, 4 * 1024);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org