You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by dw...@apache.org on 2018/08/28 13:03:02 UTC

[1/2] lucene-solr:branch_7x: LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes).

Repository: lucene-solr
Updated Branches:
  refs/heads/branch_7x 4a718dd4b -> 7eb7b90ed
  refs/heads/master a452dd9ce -> f762953aa


LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes).


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/7eb7b90e
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/7eb7b90e
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/7eb7b90e

Branch: refs/heads/branch_7x
Commit: 7eb7b90edf6fc780df6bee58922a714bd665ab34
Parents: 4a718dd
Author: Dawid Weiss <dw...@apache.org>
Authored: Tue Aug 28 15:02:17 2018 +0200
Committer: Dawid Weiss <dw...@apache.org>
Committed: Tue Aug 28 15:02:17 2018 +0200

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   2 +
 .../lucene/store/ByteArrayIndexInput.java       | 107 +++-
 .../lucene/store/ByteBuffersDataInput.java      | 323 +++++++++++
 .../lucene/store/ByteBuffersDataOutput.java     | 541 +++++++++++++++++++
 .../lucene/store/ByteBuffersDirectory.java      | 275 ++++++++++
 .../lucene/store/ByteBuffersIndexInput.java     | 200 +++++++
 .../lucene/store/ByteBuffersIndexOutput.java    | 171 ++++++
 .../lucene/store/BaseDataOutputTestCase.java    | 181 +++++++
 .../lucene/store/TestByteBuffersDataInput.java  | 206 +++++++
 .../lucene/store/TestByteBuffersDataOutput.java | 157 ++++++
 .../lucene/store/TestByteBuffersDirectory.java  |  86 +++
 11 files changed, 2219 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index ad8e02f..e9d207e 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -99,6 +99,8 @@ Changes in Runtime Behavior:
 
 Improvements
 
+* LUCENE-8468: A ByteBuffer based Directory implementation. (Dawid Weiss)
+
 * LUCENE-8447: Add DISJOINT and WITHIN support to LatLonShape queries. (Nick Knize)
 
 * LUCENE-8440: Add support for indexing and searching Line and Point shapes using LatLonShape encoding (Nick Knize)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
index 6ad6125..9bf5ab2 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
@@ -14,55 +14,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.lucene.store;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Locale;
 
-/** 
- * DataInput backed by a byte array.
- * <b>WARNING:</b> This class omits all low-level checks.
- * @lucene.experimental 
+/**
+ * A {@link IndexInput} backed by a byte array.
+ * 
+ * @lucene.experimental
  */
-public final class ByteArrayIndexInput extends IndexInput {
-
+public final class ByteArrayIndexInput extends IndexInput implements RandomAccessInput {
   private byte[] bytes;
 
+  private final int offset;
+  private final int length;
+
   private int pos;
-  private int limit;
 
   public ByteArrayIndexInput(String description, byte[] bytes) {
+    this(description, bytes, 0, bytes.length);
+  }
+  
+  public ByteArrayIndexInput(String description, byte[] bytes, int offs, int length) {
     super(description);
+    this.offset = offs;
     this.bytes = bytes;
-    this.limit = bytes.length;
+    this.length = length;
+    this.pos = offs;
   }
 
   public long getFilePointer() {
-    return pos;
+    return pos - offset;
   }
   
-  public void seek(long pos) {
-    this.pos = (int) pos;
-  }
-
-  public void reset(byte[] bytes, int offset, int len) {
-    this.bytes = bytes;
-    pos = offset;
-    limit = offset + len;
+  public void seek(long pos) throws EOFException {
+    int newPos = Math.toIntExact(pos + offset);
+    try {
+      if (pos < 0 || pos > length) {
+        throw new EOFException();
+      }
+    } finally {
+      this.pos = newPos;
+    }
   }
 
   @Override
   public long length() {
-    return limit;
-  }
-
-  public boolean eof() {
-    return pos == limit;
-  }
-
-  @Override
-  public void skipBytes(long count) {
-    pos += count;
+    return length;
   }
 
   @Override
@@ -153,9 +154,55 @@ public final class ByteArrayIndexInput extends IndexInput {
 
   @Override
   public void close() {
+    bytes = null;
   }
 
-  public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
-    throw new UnsupportedOperationException();
+  @Override
+  public IndexInput clone() {
+    ByteArrayIndexInput slice = slice("(cloned)" + toString(), 0, length());
+    try {
+      slice.seek(getFilePointer());
+    } catch (EOFException e) {
+      throw new UncheckedIOException(e);
+    }
+    return slice;
+  }
+
+  public ByteArrayIndexInput slice(String sliceDescription, long offset, long length) {
+    if (offset < 0 || length < 0 || offset + length > this.length) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "slice(offset=%s, length=%s) is out of bounds: %s",
+          offset, length, this));
+    }
+
+    return new ByteArrayIndexInput(sliceDescription, this.bytes, Math.toIntExact(this.offset + offset), 
+        Math.toIntExact(length));
+  }
+
+  @Override
+  public byte readByte(long pos) throws IOException {
+    return bytes[Math.toIntExact(offset + pos)];
+  }
+
+  @Override
+  public short readShort(long pos) throws IOException {
+    int i = Math.toIntExact(offset + pos);
+    return (short) (((bytes[i]     & 0xFF) << 8) |
+                     (bytes[i + 1] & 0xFF));
+  }
+
+  @Override
+  public int readInt(long pos) throws IOException {
+    int i = Math.toIntExact(offset + pos);
+    return ((bytes[i]     & 0xFF) << 24) | 
+           ((bytes[i + 1] & 0xFF) << 16) | 
+           ((bytes[i + 2] & 0xFF) <<  8) | 
+            (bytes[i + 3] & 0xFF);
+  }
+
+  @Override
+  public long readLong(long pos) throws IOException {
+    return (((long) readInt(pos)) << 32) | 
+             (readInt(pos + 4) & 0xFFFFFFFFL);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
new file mode 100644
index 0000000..e8418ed
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.EOFException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * A {@link DataInput} implementing {@link RandomAccessInput} and reading data from a
+ * list of {@link ByteBuffer}s.
+ */
+public final class ByteBuffersDataInput extends DataInput implements Accountable, RandomAccessInput {
+  private final ByteBuffer[] blocks;
+  private final int blockBits;
+  private final int blockMask;
+  private final long size;
+  private final long offset;
+
+  private long pos;
+
+  /**
+   * Read data from a set of contiguous buffers. All data buffers except for the last one 
+   * must have an identical remaining number of bytes in the buffer (that is a power of two). The last
+   * buffer can be of an arbitrary remaining length.
+   */
+  public ByteBuffersDataInput(List<ByteBuffer> buffers) {
+    ensureAssumptions(buffers);
+
+    this.blocks = buffers.stream().map(buf -> buf.asReadOnlyBuffer()).toArray(ByteBuffer[]::new);
+
+    if (blocks.length == 1) {
+      this.blockBits = 32;
+      this.blockMask = ~0;
+    } else {
+      final int blockBytes = determineBlockPage(buffers);
+      this.blockBits = Integer.numberOfTrailingZeros(blockBytes);
+      this.blockMask = (1 << blockBits) - 1;
+    }
+
+    this.size = Arrays.stream(blocks).mapToLong(block -> block.remaining()).sum();
+
+    // The initial "position" of this stream is shifted by the position of the first block.
+    this.offset = blocks[0].position();
+    this.pos = offset;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    // Return a rough estimation for allocated blocks. Note that we do not make
+    // any special distinction for what the type of buffer is (direct vs. heap-based).
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.length + 
+           Arrays.stream(blocks).mapToLong(buf -> buf.capacity()).sum();
+  }
+
+  @Override
+  public byte readByte() throws EOFException {
+    try {
+      ByteBuffer block = blocks[blockIndex(pos)];
+      byte v = block.get(blockOffset(pos));
+      pos++;
+      return v;
+    } catch (IndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  /**
+   * Reads exactly {@code len} bytes into the given buffer. The buffer must have
+   * enough remaining limit.
+   * 
+   * If there are fewer than {@code len} bytes in the input, {@link EOFException} 
+   * is thrown. 
+   */
+  public void readBytes(ByteBuffer buffer, int len) throws EOFException {
+    try {
+      while (len > 0) {
+        ByteBuffer block = blocks[blockIndex(pos)].duplicate();
+        int blockOffset = blockOffset(pos);
+        block.position(blockOffset);
+        int chunk = Math.min(len, block.remaining());
+        if (chunk == 0) {
+          throw new EOFException();
+        }
+
+        // Update pos early on for EOF detection on output buffer, then try to get buffer content.
+        pos += chunk;
+        block.limit(blockOffset + chunk);
+        buffer.put(block);
+
+        len -= chunk;
+      }
+    } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] arr, int off, int len) throws EOFException {
+    try {
+      while (len > 0) {
+        ByteBuffer block = blocks[blockIndex(pos)].duplicate();
+        block.position(blockOffset(pos));
+        int chunk = Math.min(len, block.remaining());
+        if (chunk == 0) {
+          throw new EOFException();
+        }
+
+        // Update pos early on for EOF detection, then try to get buffer content.
+        pos += chunk;
+        block.get(arr, off, chunk);
+
+        len -= chunk;
+        off += chunk;
+      }
+    } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  @Override
+  public byte readByte(long pos) {
+    pos += offset;
+    return blocks[blockIndex(pos)].get(blockOffset(pos));
+  }
+
+  @Override
+  public short readShort(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Short.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getShort(blockOffset);
+    } else {
+      return (short) ((readByte(pos    ) & 0xFF) << 8 | 
+                      (readByte(pos + 1) & 0xFF));
+    }
+  }
+
+  @Override
+  public int readInt(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Integer.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getInt(blockOffset);
+    } else {
+      return ((readByte(pos    )       ) << 24 |
+              (readByte(pos + 1) & 0xFF) << 16 |
+              (readByte(pos + 2) & 0xFF) << 8  |
+              (readByte(pos + 3) & 0xFF));
+    }
+  }
+
+  @Override
+  public long readLong(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Long.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getLong(blockOffset);
+    } else {
+      return (((long) readInt(pos)) << 32) | (readInt(pos + 4) & 0xFFFFFFFFL);
+    }
+  }
+
+  public long position() {
+    return pos - offset;
+  }
+
+  public void seek(long position) throws EOFException {
+    this.pos = position + offset;
+    if (position > size()) {
+      this.pos = size();
+      throw new EOFException();
+    }
+  }
+  
+  public ByteBuffersDataInput slice(long offset, long length) {
+    if (offset < 0 || length < 0 || offset + length > this.size) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "slice(offset=%s, length=%s) is out of bounds: %s",
+          offset, length, this));
+    }
+
+    return new ByteBuffersDataInput(sliceBufferList(Arrays.asList(this.blocks), offset, length));
+  }
+
+  @Override
+  public String toString() {
+    return String.format(Locale.ROOT,
+        "%,d bytes, block size: %,d, blocks: %,d, position: %,d%s",
+        size(),
+        blockSize(),
+        blocks.length,
+        position(),
+        offset == 0 ? "" : String.format(Locale.ROOT, " [offset: %,d]", offset));
+  }
+
+  private final int blockIndex(long pos) {
+    return Math.toIntExact(pos >> blockBits);
+  }  
+
+  private final int blockOffset(long pos) {
+    return (int) pos & blockMask;
+  }  
+  
+  private int blockSize() {
+    return 1 << blockBits;
+  }
+
+  private static final boolean isPowerOfTwo(int v) {
+    return (v & (v - 1)) == 0;
+  }
+
+  private static void ensureAssumptions(List<ByteBuffer> buffers) {
+    if (buffers.isEmpty()) {
+      throw new IllegalArgumentException("Buffer list must not be empty.");
+    }
+
+    if (buffers.size() == 1) {
+      // Special case of just a single buffer, conditions don't apply.
+    } else {
+      final int blockPage = determineBlockPage(buffers);
+      
+      // First buffer decides on block page length.
+      if (!isPowerOfTwo(blockPage)) {
+        throw new IllegalArgumentException("The first buffer must have power-of-two position() + remaining(): 0x"
+            + Integer.toHexString(blockPage));
+      }
+
+      // Any block from 2..last-1 should have the same page size.
+      for (int i = 1, last = buffers.size() - 1; i < last; i++) {
+        ByteBuffer buffer = buffers.get(i);
+        if (buffer.position() != 0) {
+          throw new IllegalArgumentException("All buffers except for the first one must have position() == 0: " + buffer);
+        }
+        if (i != last && buffer.remaining() != blockPage) {
+          throw new IllegalArgumentException("Intermediate buffers must share an identical remaining() power-of-two block size: 0x" 
+              + Integer.toHexString(blockPage));
+        }
+      }
+    }
+  }
+
+  static int determineBlockPage(List<ByteBuffer> buffers) {
+    ByteBuffer first = buffers.get(0);
+    final int blockPage = Math.toIntExact((long) first.position() + first.remaining());
+    return blockPage;
+  }
+
+  private static List<ByteBuffer> sliceBufferList(List<ByteBuffer> buffers, long offset, long length) {
+    ensureAssumptions(buffers);
+
+    if (buffers.size() == 1) {
+      ByteBuffer cloned = buffers.get(0).asReadOnlyBuffer();
+      cloned.position(Math.toIntExact(cloned.position() + offset));
+      cloned.limit(Math.toIntExact(length + cloned.position()));
+      return Arrays.asList(cloned);
+    } else {
+      long absStart = buffers.get(0).position() + offset;
+      long absEnd = Math.toIntExact(absStart + length);
+
+      int blockBytes = ByteBuffersDataInput.determineBlockPage(buffers);
+      int blockBits = Integer.numberOfTrailingZeros(blockBytes);
+      int blockMask = (1 << blockBits) - 1;
+
+      int endOffset = (int) absEnd & blockMask;
+
+      ArrayList<ByteBuffer> cloned = 
+        buffers.subList(Math.toIntExact(absStart / blockBytes), 
+                        Math.toIntExact(absEnd / blockBytes + (endOffset == 0 ? 0 : 1)))
+          .stream()
+          .map(buf -> buf.asReadOnlyBuffer())
+          .collect(Collectors.toCollection(ArrayList::new));
+
+      if (endOffset == 0) {
+        cloned.add(ByteBuffer.allocate(0));
+      }
+
+      cloned.get(0).position((int) absStart & blockMask);
+      cloned.get(cloned.size() - 1).limit(endOffset);
+      return cloned;
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
new file mode 100644
index 0000000..8840f21
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.UnicodeUtil;
+
+/**
+ * A {@link DataOutput} storing data in a list of {@link ByteBuffer}s.
+ */
+public final class ByteBuffersDataOutput extends DataOutput implements Accountable {
+  private final static ByteBuffer EMPTY = ByteBuffer.allocate(0);
+  private final static byte [] EMPTY_BYTE_ARRAY = {};
+
+  public final static IntFunction<ByteBuffer> ALLOCATE_BB_ON_HEAP = ByteBuffer::allocate;
+
+  /**
+   * A singleton instance of "no-reuse" buffer strategy.
+   */
+  public final static Consumer<ByteBuffer> NO_REUSE = (bb) -> {
+    throw new RuntimeException("reset() is not allowed on this buffer.");
+  };
+
+  /**
+   * An implementation of a {@link ByteBuffer} allocation and recycling policy.
+   * The blocks are recycled if exactly the same size is requested, otherwise
+   * they're released to be GCed.
+   */
+  public final static class ByteBufferRecycler {
+    private final ArrayDeque<ByteBuffer> reuse = new ArrayDeque<>();
+    private final IntFunction<ByteBuffer> delegate;
+
+    public ByteBufferRecycler(IntFunction<ByteBuffer> delegate) {
+      this.delegate = Objects.requireNonNull(delegate);
+    }
+
+    public ByteBuffer allocate(int size) {
+      while (!reuse.isEmpty()) {
+        ByteBuffer bb = reuse.removeFirst();
+        // If we don't have a buffer of exactly the requested size, discard it.
+        if (bb.remaining() == size) {
+          return bb;
+        }
+      }
+
+      return delegate.apply(size);        
+    }
+
+    public void reuse(ByteBuffer buffer) {
+      buffer.rewind();
+      reuse.addLast(buffer);
+    }
+  }
+
+  public final static int DEFAULT_MIN_BITS_PER_BLOCK = 10; // 1024 B
+  public final static int DEFAULT_MAX_BITS_PER_BLOCK = 26; //   64 MB
+
+  /**
+   * Maximum number of blocks at the current {@link #blockBits} block size
+   * before we increase the block size (and thus decrease the number of blocks).
+   */
+  final static int MAX_BLOCKS_BEFORE_BLOCK_EXPANSION = 100;
+
+  /**
+   * Maximum block size: {@code 2^bits}.
+   */
+  private final int maxBitsPerBlock;
+
+  /**
+   * {@link ByteBuffer} supplier.
+   */
+  private final IntFunction<ByteBuffer> blockAllocate;
+
+  /**
+   * {@link ByteBuffer} recycler on {@link #reset}.
+   */
+  private final Consumer<ByteBuffer> blockReuse;
+
+  /**
+   * Current block size: {@code 2^bits}.
+   */
+  private int blockBits;
+
+  /**
+   * Blocks storing data.
+   */
+  private final ArrayDeque<ByteBuffer> blocks = new ArrayDeque<>();
+
+  /**
+   * The current-or-next write block.
+   */
+  private ByteBuffer currentBlock = EMPTY;
+
+  public ByteBuffersDataOutput(long expectedSize) {
+    this(computeBlockSizeBitsFor(expectedSize), DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
+  }
+
+  public ByteBuffersDataOutput() {
+    this(DEFAULT_MIN_BITS_PER_BLOCK, DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
+  }
+
+  public ByteBuffersDataOutput(int minBitsPerBlock, 
+                               int maxBitsPerBlock,
+                               IntFunction<ByteBuffer> blockAllocate,
+                               Consumer<ByteBuffer> blockReuse) {
+    if (minBitsPerBlock < 10 ||
+        minBitsPerBlock > maxBitsPerBlock ||
+        maxBitsPerBlock > 31) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "Invalid arguments: %s %s",
+          minBitsPerBlock,
+          maxBitsPerBlock));
+    }
+    this.maxBitsPerBlock = maxBitsPerBlock;
+    this.blockBits = minBitsPerBlock;
+    this.blockAllocate = Objects.requireNonNull(blockAllocate, "Block allocator must not be null.");
+    this.blockReuse = Objects.requireNonNull(blockReuse, "Block reuse must not be null.");
+  }
+
+  @Override
+  public void writeByte(byte b) {
+    if (!currentBlock.hasRemaining()) {
+      appendBlock();
+    }
+    currentBlock.put(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] src, int offset, int length) {
+    assert length >= 0;
+    while (length > 0) {
+      if (!currentBlock.hasRemaining()) {
+        appendBlock();
+      }
+
+      int chunk = Math.min(currentBlock.remaining(), length);
+      currentBlock.put(src, offset, chunk);
+      length -= chunk;
+      offset += chunk;
+    }
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) {
+    writeBytes(b, 0, length);
+  }
+
+  public void writeBytes(byte[] b) {
+    writeBytes(b, 0, b.length);
+  }
+
+  public void writeBytes(ByteBuffer buffer) {
+    buffer = buffer.duplicate();
+    int length = buffer.remaining();
+    while (length > 0) {
+      if (!currentBlock.hasRemaining()) {
+        appendBlock();
+      }
+
+      int chunk = Math.min(currentBlock.remaining(), length);
+      buffer.limit(buffer.position() + chunk);
+      currentBlock.put(buffer);
+
+      length -= chunk;
+    }
+  }
+
+  /**
+   * Return a list of read-only view of {@link ByteBuffer} blocks over the 
+   * current content written to the output.
+   */
+  public ArrayList<ByteBuffer> toBufferList() {
+    ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
+    if (blocks.isEmpty()) {
+      result.add(EMPTY);
+    } else {
+      for (ByteBuffer bb : blocks) {
+        bb = (ByteBuffer) bb.asReadOnlyBuffer().flip(); // cast for jdk8 (covariant in jdk9+) 
+        result.add(bb);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns a list of writeable blocks over the (source) content buffers.
+   * 
+   * This method returns the raw content of source buffers that may change over the lifetime 
+   * of this object (blocks can be recycled or discarded, for example). Most applications 
+   * should favor calling {@link #toBufferList()} which returns a read-only <i>view</i> over 
+   * the content of the source buffers.
+   * 
+   * The difference between {@link #toBufferList()} and {@link #toWriteableBufferList()} is that
+   * read-only view of source buffers will always return {@code false} from {@link ByteBuffer#hasArray()}
+   * (which sometimes may be required to avoid double copying).
+   */
+  public ArrayList<ByteBuffer> toWriteableBufferList() {
+    ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
+    if (blocks.isEmpty()) {
+      result.add(EMPTY);
+    } else {
+      for (ByteBuffer bb : blocks) {
+        bb = (ByteBuffer) bb.duplicate().flip(); // cast for jdk8 (covariant in jdk9+) 
+        result.add(bb);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return a {@link ByteBuffersDataInput} for the set of current buffers ({@link #toBufferList()}). 
+   */
+  public ByteBuffersDataInput toDataInput() {
+    return new ByteBuffersDataInput(toBufferList());
+  }
+
+  /**
+   * Return a contiguous array with the current content written to the output. The returned
+   * array is always a copy (can be mutated).
+   */
+  public byte[] toArrayCopy() {
+    if (blocks.size() == 0) {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    // We could try to detect single-block, array-based ByteBuffer here
+    // and use Arrays.copyOfRange, but I don't think it's worth the extra
+    // instance checks.
+
+    byte [] arr = new byte[Math.toIntExact(size())];
+    int offset = 0;
+    for (ByteBuffer bb : toBufferList()) {
+      int len = bb.remaining();
+      bb.get(arr, offset, len);
+      offset += len;
+    }
+    return arr;
+  }  
+
+  /**
+   * Copy the current content of this object into another {@link DataOutput}.
+   */
+  public void copyTo(DataOutput output) throws IOException {
+    for (ByteBuffer bb : toBufferList()) {
+      if (bb.hasArray()) {
+        output.writeBytes(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+      } else {
+        output.copyBytes(new ByteBuffersDataInput(Arrays.asList(bb)), bb.remaining());
+      }
+    }
+  }
+
+  /**
+   * @return The number of bytes written to this output so far.
+   */
+  public long size() {
+    long size = 0;
+    int blockCount = blocks.size();
+    if (blockCount >= 1) {
+      int fullBlockSize = (blockCount - 1) * blockSize();
+      int lastBlockSize = blocks.getLast().position();
+      size = fullBlockSize + lastBlockSize;
+    }
+    return size;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(Locale.ROOT,
+        "%,d bytes, block size: %,d, blocks: %,d",
+        size(),
+        blockSize(),
+        blocks.size());
+  }
+
+  // Specialized versions of writeXXX methods that break execution into
+  // fast/ slow path if the result would fall on the current block's 
+  // boundary.
+  // 
+  // We also remove the IOException from methods because it (theoretically)
+  // cannot be thrown from byte buffers.
+
+  @Override
+  public void writeShort(short v) {
+    try {
+      if (currentBlock.remaining() >= Short.BYTES) {
+        currentBlock.putShort(v);
+      } else {
+        super.writeShort(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void writeInt(int v) {
+    try {
+      if (currentBlock.remaining() >= Integer.BYTES) {
+        currentBlock.putInt(v);
+      } else {
+        super.writeInt(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+
+  @Override
+  public void writeLong(long v) {
+    try {
+      if (currentBlock.remaining() >= Long.BYTES) {
+        currentBlock.putLong(v);
+      } else {
+        super.writeLong(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+
+  @Override
+  public void writeString(String v) {
+    try {
+      final int MAX_CHARS_PER_WINDOW = 1024;
+      if (v.length() <= MAX_CHARS_PER_WINDOW) {
+        final BytesRef utf8 = new BytesRef(v);
+        writeVInt(utf8.length);
+        writeBytes(utf8.bytes, utf8.offset, utf8.length);
+      } else {
+        writeVInt(UnicodeUtil.calcUTF16toUTF8Length(v, 0, v.length()));
+        final byte [] buf = new byte [UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR * MAX_CHARS_PER_WINDOW];
+        UTF16toUTF8(v, 0, v.length(), buf, (len) -> {
+          writeBytes(buf, 0, len);
+        });
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+  
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) {
+    try {
+      super.writeMapOfStrings(map);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+  
+  @Override
+  public void writeSetOfStrings(Set<String> set) {
+    try {
+      super.writeSetOfStrings(set);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }      
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    // Return a rough estimation for allocated blocks. Note that we do not make
+    // any special distinction for direct memory buffers.
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.size() + 
+           blocks.stream().mapToLong(buf -> buf.capacity()).sum();
+  }
+
+  /**
+   * This method resets this object to a clean (zero-size) state and
+   * publishes any currently allocated buffers for reuse to the reuse strategy
+   * provided in the constructor.
+   * 
+   * Sharing byte buffers for reads and writes is dangerous and will very likely
+   * lead to hard-to-debug issues, use with great care.
+   */
+  public void reset() {
+    blocks.stream().forEach(blockReuse);
+    blocks.clear();
+    currentBlock = EMPTY;
+  }
+
+  /**
+   * @return Returns a new {@link ByteBuffersDataOutput} with the {@link #reset()} capability. 
+   */
+  // TODO: perhaps we can move it out to an utility class (as a supplier of preconfigured instances?) 
+  public static ByteBuffersDataOutput newResettableInstance() {
+    ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
+        ByteBuffersDataOutput.ALLOCATE_BB_ON_HEAP); 
+    return new ByteBuffersDataOutput(
+        ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
+        ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK,
+        reuser::allocate,
+        reuser::reuse);
+  }
+
+  private int blockSize() {
+    return 1 << blockBits;
+  }
+
+  private void appendBlock() {
+    if (blocks.size() >= MAX_BLOCKS_BEFORE_BLOCK_EXPANSION && blockBits < maxBitsPerBlock) {
+      rewriteToBlockSize(blockBits + 1);
+      if (blocks.getLast().hasRemaining()) {
+        return;
+      }
+    }
+
+    final int requiredBlockSize = 1 << blockBits;
+    currentBlock = blockAllocate.apply(requiredBlockSize);
+    assert currentBlock.capacity() == requiredBlockSize;
+    blocks.add(currentBlock);
+  }
+
+  private void rewriteToBlockSize(int targetBlockBits) {
+    assert targetBlockBits <= maxBitsPerBlock;
+
+    // We copy over data blocks to an output with one-larger block bit size.
+    // We also discard references to blocks as we're copying to allow GC to
+    // clean up partial results in case of memory pressure.
+    ByteBuffersDataOutput cloned = new ByteBuffersDataOutput(targetBlockBits, targetBlockBits, blockAllocate, NO_REUSE);
+    ByteBuffer block;
+    while ((block = blocks.pollFirst()) != null) {
+      block.flip();
+      cloned.writeBytes(block);
+      if (blockReuse != NO_REUSE) {
+        blockReuse.accept(block);
+      }
+    }
+
+    assert blocks.isEmpty();
+    this.blockBits = targetBlockBits;
+    blocks.addAll(cloned.blocks);
+  }
+
+  private static int computeBlockSizeBitsFor(long bytes) {
+    long powerOfTwo = BitUtil.nextHighestPowerOfTwo(bytes / MAX_BLOCKS_BEFORE_BLOCK_EXPANSION);
+    if (powerOfTwo == 0) {
+      return DEFAULT_MIN_BITS_PER_BLOCK;
+    }
+    
+    int blockBits = Long.numberOfTrailingZeros(powerOfTwo);
+    blockBits = Math.min(blockBits, DEFAULT_MAX_BITS_PER_BLOCK);
+    blockBits = Math.max(blockBits, DEFAULT_MIN_BITS_PER_BLOCK);
+    return blockBits;
+  }
+
+  // TODO: move this block-based conversion to UnicodeUtil.
+  
+  private static final long HALF_SHIFT = 10;
+  private static final int SURROGATE_OFFSET = 
+      Character.MIN_SUPPLEMENTARY_CODE_POINT - 
+      (UnicodeUtil.UNI_SUR_HIGH_START << HALF_SHIFT) - UnicodeUtil.UNI_SUR_LOW_START;
+
+  /**
+   * A consumer-based UTF16-UTF8 encoder (writes the input string in smaller buffers.).
+   */
+  private static int UTF16toUTF8(final CharSequence s, 
+                                final int offset, 
+                                final int length, 
+                                byte[] buf, 
+                                IntConsumer bufferFlusher) {
+    int utf8Len = 0;
+    int j = 0;    
+    for (int i = offset, end = offset + length; i < end; i++) {
+      final int chr = (int) s.charAt(i);
+
+      if (j + 4 >= buf.length) {
+        bufferFlusher.accept(j);
+        utf8Len += j;
+        j = 0;
+      }
+
+      if (chr < 0x80)
+        buf[j++] = (byte) chr;
+      else if (chr < 0x800) {
+        buf[j++] = (byte) (0xC0 | (chr >> 6));
+        buf[j++] = (byte) (0x80 | (chr & 0x3F));
+      } else if (chr < 0xD800 || chr > 0xDFFF) {
+        buf[j++] = (byte) (0xE0 | (chr >> 12));
+        buf[j++] = (byte) (0x80 | ((chr >> 6) & 0x3F));
+        buf[j++] = (byte) (0x80 | (chr & 0x3F));
+      } else {
+        // A surrogate pair. Confirm valid high surrogate.
+        if (chr < 0xDC00 && (i < end - 1)) {
+          int utf32 = (int) s.charAt(i + 1);
+          // Confirm valid low surrogate and write pair.
+          if (utf32 >= 0xDC00 && utf32 <= 0xDFFF) { 
+            utf32 = (chr << 10) + utf32 + SURROGATE_OFFSET;
+            i++;
+            buf[j++] = (byte) (0xF0 | (utf32 >> 18));
+            buf[j++] = (byte) (0x80 | ((utf32 >> 12) & 0x3F));
+            buf[j++] = (byte) (0x80 | ((utf32 >> 6) & 0x3F));
+            buf[j++] = (byte) (0x80 | (utf32 & 0x3F));
+            continue;
+          }
+        }
+        // Replace unpaired surrogate or out-of-order low surrogate
+        // with substitution character.
+        buf[j++] = (byte) 0xEF;
+        buf[j++] = (byte) 0xBF;
+        buf[j++] = (byte) 0xBD;
+      }
+    }
+
+    bufferFlusher.accept(j);
+    utf8Len += j;
+
+    return utf8Len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
new file mode 100644
index 0000000..acff5cf
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.zip.CRC32;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.util.BitUtil;
+
+/**
+ * A {@link ByteBuffer}-based {@link Directory} implementation that
+ * can be used to store index files on the heap.
+ *
+ * <p>Important: Note that {@link MMapDirectory} is nearly always a better choice as
+ * it uses OS caches more effectively (through memory-mapped buffers).
+ * A heap-based directory like this one can have the advantage in case of ephemeral, small,
+ * short-lived indexes when disk syncs provide an additional overhead.</p>
+ *
+ * @lucene.experimental
+ */
+public final class ByteBuffersDirectory extends BaseDirectory {
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS = 
+      (fileName, output) -> {
+        ByteBuffersDataInput dataInput = output.toDataInput();
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
+            ByteBuffersIndexInput.class.getSimpleName(),
+            fileName,
+            dataInput.toString());
+        return new ByteBuffersIndexInput(dataInput, inputName);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_ONE_BUFFER = 
+      (fileName, output) -> {
+        ByteBuffersDataInput dataInput = new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(output.toArrayCopy())));
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
+            ByteBuffersIndexInput.class.getSimpleName(),
+            fileName,
+            dataInput.toString());
+        return new ByteBuffersIndexInput(dataInput, inputName);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_BYTE_ARRAY = 
+      (fileName, output) -> {
+        byte[] array = output.toArrayCopy();
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, length=%s)",
+            ByteArrayIndexInput.class.getSimpleName(),
+            fileName,
+            array.length);
+        return new ByteArrayIndexInput(inputName, array, 0, array.length);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS_LUCENE = 
+      (fileName, output) -> {
+        List<ByteBuffer> bufferList = output.toBufferList();
+        int chunkSizePower;
+        bufferList.add(ByteBuffer.allocate(0));
+        int blockSize = ByteBuffersDataInput.determineBlockPage(bufferList);
+        if (blockSize == 0) {
+          chunkSizePower = 30;
+        } else {
+          chunkSizePower = Integer.numberOfTrailingZeros(BitUtil.nextHighestPowerOfTwo(blockSize));
+        }
+
+        String inputName = String.format(Locale.ROOT, "%s (file=%s)",
+            ByteBuffersDirectory.class.getSimpleName(),
+            fileName);
+
+        ByteBufferGuard guard = new ByteBufferGuard("none", (String resourceDescription, ByteBuffer b) -> {});
+        return ByteBufferIndexInput.newInstance(inputName, 
+            bufferList.toArray(new ByteBuffer [bufferList.size()]), 
+            output.size(), chunkSizePower, guard);
+      };
+
+  private final Function<String, String> tempFileName = new Function<String, String>() {
+    private final AtomicLong counter = new AtomicLong();
+
+    @Override
+    public String apply(String suffix) {
+      return suffix + "_" + Long.toString(counter.getAndIncrement(), Character.MAX_RADIX);
+    }
+  };
+
+  private final ConcurrentHashMap<String, FileEntry> files = new ConcurrentHashMap<>();
+
+  /**
+   * Conversion between a buffered index output and the corresponding index input
+   * for a given file.   
+   */
+  private final BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput;
+
+  /**
+   * A supplier of {@link ByteBuffersDataOutput} instances used to buffer up 
+   * the content of written files.
+   */
+  private final Supplier<ByteBuffersDataOutput> bbOutputSupplier;
+
+  public ByteBuffersDirectory() {
+    this(new SingleInstanceLockFactory());
+  }
+  
+  public ByteBuffersDirectory(LockFactory lockFactory) {
+    this(lockFactory, ByteBuffersDataOutput::new, OUTPUT_AS_MANY_BUFFERS);
+  }
+
+  public ByteBuffersDirectory(LockFactory factory, 
+      Supplier<ByteBuffersDataOutput> bbOutputSupplier,
+      BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) {
+    super(factory);
+    this.outputToInput = Objects.requireNonNull(outputToInput);
+    this.bbOutputSupplier = Objects.requireNonNull(bbOutputSupplier);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    ensureOpen();
+    return files.keySet().stream().sorted().toArray(String[]::new);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    ensureOpen();
+    FileEntry removed = files.remove(name);
+    if (removed == null) {
+      throw new FileNotFoundException(name);
+    }
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    ensureOpen();
+    FileEntry file = files.get(name);
+    if (file == null) {
+      throw new FileNotFoundException(name);
+    }
+    return file.length();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    ensureOpen();
+    FileEntry e = new FileEntry(name); 
+    if (files.putIfAbsent(name, e) != null) {
+      throw new FileAlreadyExistsException("File already exists: " + name);
+    }
+    return e.createOutput(outputToInput);
+  }
+
+  @Override
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
+    ensureOpen();
+    while (true) {
+      String name = IndexFileNames.segmentFileName(prefix, tempFileName.apply(suffix), "tmp");
+      FileEntry e = new FileEntry(name); 
+      if (files.putIfAbsent(name, e) == null) {
+        return e.createOutput(outputToInput);
+      }
+    }
+  }
+
+  @Override
+  public void rename(String source, String dest) throws IOException {
+    ensureOpen();
+
+    FileEntry file = files.get(source);
+    if (file == null) {
+      throw new FileNotFoundException(source);
+    }
+    if (files.putIfAbsent(dest, file) != null) {
+      throw new FileAlreadyExistsException(dest);
+    }
+    if (!files.remove(source, file)) {
+      throw new IllegalStateException("File was unexpectedly replaced: " + source);
+    }
+    files.remove(source);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    ensureOpen();
+  }
+
+  @Override
+  public void syncMetaData() throws IOException {
+    ensureOpen();
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    ensureOpen();
+    FileEntry e = files.get(name);
+    if (e == null) {
+      throw new NoSuchFileException(name);
+    } else {
+      return e.openInput();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    isOpen = false;
+    files.clear();
+  }
+
+  private final class FileEntry {
+    private final String fileName;
+
+    private volatile IndexInput content;
+    private volatile long cachedLength;
+
+    public FileEntry(String name) {
+      this.fileName = name;
+    }
+
+    public long length() {
+      // We return 0 length until the IndexOutput is closed and flushed.
+      return cachedLength;
+    }
+
+    public IndexInput openInput() throws IOException {
+      IndexInput local = this.content;
+      if (local == null) {
+        throw new AccessDeniedException("Can't open a file still open for writing: " + fileName);
+      }
+
+      return local.clone();
+    }
+
+    final IndexOutput createOutput(BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) throws IOException {
+      if (content != null) {
+        throw new IOException("Can only write to a file once: " + fileName);
+      }
+
+      String clazzName = ByteBuffersDirectory.class.getSimpleName();
+      String outputName = String.format(Locale.ROOT, "%s output (file=%s)", clazzName, fileName);
+
+      return new ByteBuffersIndexOutput(
+          bbOutputSupplier.get(), outputName, fileName,
+          new CRC32(),
+          (output) -> {
+            content = outputToInput.apply(fileName, output);
+            cachedLength = output.size();
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
new file mode 100644
index 0000000..7c87d24
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+
+/**
+ * An {@link IndexInput} implementing {@link RandomAccessInput} and backed
+ * by a {@link ByteBuffersDataInput}.
+ */
+public final class ByteBuffersIndexInput extends IndexInput implements RandomAccessInput {
+  private ByteBuffersDataInput in;
+
+  public ByteBuffersIndexInput(ByteBuffersDataInput in, String resourceDescription) {
+    super(resourceDescription);
+    this.in = in;
+  }
+
+  @Override
+  public void close() throws IOException {
+    in = null;
+  }
+
+  @Override
+  public long getFilePointer() {
+    ensureOpen();
+    return in.position();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ensureOpen();
+    in.seek(pos);
+  }
+
+  @Override
+  public long length() {
+    ensureOpen();
+    return in.size();
+  }
+
+  @Override
+  public ByteBuffersIndexInput slice(String sliceDescription, long offset, long length) throws IOException {
+    ensureOpen();
+    return new ByteBuffersIndexInput(in.slice(offset, length), 
+        "(sliced) offset=" + offset + ", length=" + length + " " + toString());
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureOpen();
+    return in.readByte();
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    ensureOpen();
+    in.readBytes(b, offset, len);
+  }
+
+  @Override
+  public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
+    ensureOpen();
+    return slice("", offset, length);
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+    ensureOpen();
+    in.readBytes(b, offset, len, useBuffer);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    ensureOpen();
+    return in.readShort();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    ensureOpen();
+    return in.readInt();
+  }
+
+  @Override
+  public int readVInt() throws IOException {
+    ensureOpen();
+    return in.readVInt();
+  }
+
+  @Override
+  public int readZInt() throws IOException {
+    ensureOpen();
+    return in.readZInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    ensureOpen();
+    return in.readLong();
+  }
+
+  @Override
+  public long readVLong() throws IOException {
+    ensureOpen();
+    return in.readVLong();
+  }
+
+  @Override
+  public long readZLong() throws IOException {
+    ensureOpen();
+    return in.readZLong();
+  }
+
+  @Override
+  public String readString() throws IOException {
+    ensureOpen();
+    return in.readString();
+  }
+
+  @Override
+  public Map<String, String> readMapOfStrings() throws IOException {
+    ensureOpen();
+    return in.readMapOfStrings();
+  }
+
+  @Override
+  public Set<String> readSetOfStrings() throws IOException {
+    ensureOpen();
+    return in.readSetOfStrings();
+  }
+
+  @Override
+  public void skipBytes(long numBytes) throws IOException {
+    ensureOpen();
+    super.skipBytes(numBytes);
+  }
+
+  @Override
+  public byte readByte(long pos) throws IOException {
+    ensureOpen();
+    return in.readByte(pos);
+  }
+
+  @Override
+  public short readShort(long pos) throws IOException {
+    ensureOpen();
+    return in.readShort(pos);
+  }
+
+  @Override
+  public int readInt(long pos) throws IOException {
+    ensureOpen();
+    return in.readInt(pos);
+  }
+
+  @Override
+  public long readLong(long pos) throws IOException {
+    ensureOpen();
+    return in.readLong(pos);
+  }
+  
+  @Override
+  public IndexInput clone() {
+    ensureOpen();
+    ByteBuffersIndexInput cloned = new ByteBuffersIndexInput(in.slice(0, in.size()), "(clone of) " + toString());
+    try {
+      cloned.seek(getFilePointer());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    return cloned;
+  }
+
+  private void ensureOpen() {
+    if (in == null) {
+      throw new AlreadyClosedException("Already closed.");
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
new file mode 100644
index 0000000..19dc400
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * An {@link IndexOutput} writing to a {@link ByteBuffersDataOutput}.
+ */
+public final class ByteBuffersIndexOutput extends IndexOutput {
+  private final Consumer<ByteBuffersDataOutput> onClose;
+  
+  private final Checksum checksum;
+  private long lastChecksumPosition;
+  private long lastChecksum;
+
+  private ByteBuffersDataOutput delegate;
+
+  public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name) {
+    this(delegate, resourceDescription, name, new CRC32(), null);
+  }
+
+  public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name, 
+      Checksum checksum,
+      Consumer<ByteBuffersDataOutput> onClose) {
+    super(resourceDescription, name);
+    this.delegate = delegate;
+    this.checksum = checksum;
+    this.onClose = onClose;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // No special effort to be thread-safe here since IndexOutputs are not required to be thread-safe.
+    ByteBuffersDataOutput local = delegate;
+    delegate = null;
+    if (local != null && onClose != null) {
+      onClose.accept(local);
+    }
+  }
+
+  @Override
+  public long getFilePointer() {
+    ensureOpen();
+    return delegate.size();
+  }
+
+  @Override
+  public long getChecksum() throws IOException {
+    ensureOpen();
+
+    if (checksum == null) {
+      throw new IOException("This index output has no checksum computing ability: " + toString());
+    }
+
+    // Compute checksum on the current content of the delegate.
+    //
+    // This way we can override more methods and pass them directly to the delegate for efficiency of writing,
+    // while allowing the checksum to be correctly computed on the current content of the output buffer (IndexOutput
+    // is per-thread, so no concurrent changes).
+    if (lastChecksumPosition != delegate.size()) {
+      lastChecksumPosition = delegate.size();
+      checksum.reset();
+      byte [] buffer = null;
+      for (ByteBuffer bb : delegate.toBufferList()) {
+        if (bb.hasArray()) {
+          checksum.update(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+        } else {
+          if (buffer == null) buffer = new byte [1024 * 4];
+
+          bb = bb.asReadOnlyBuffer();
+          int remaining = bb.remaining();
+          while (remaining > 0) {
+            int len = Math.min(remaining, buffer.length);
+            bb.get(buffer, 0, len);
+            checksum.update(buffer, 0, len);
+            remaining -= len;
+          }
+        }
+      }
+      lastChecksum = checksum.getValue(); 
+    }
+    return lastChecksum;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    ensureOpen();
+    delegate.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    ensureOpen();
+    delegate.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) throws IOException {
+    ensureOpen();
+    delegate.writeBytes(b, length);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    ensureOpen();
+    delegate.writeInt(i);
+  }
+
+  @Override
+  public void writeShort(short i) throws IOException {
+    ensureOpen();
+    delegate.writeShort(i);
+  }
+
+  @Override
+  public void writeLong(long i) throws IOException {
+    ensureOpen();
+    delegate.writeLong(i);
+  }
+
+  @Override
+  public void writeString(String s) throws IOException {
+    ensureOpen();
+    delegate.writeString(s);
+  }
+
+  @Override
+  public void copyBytes(DataInput input, long numBytes) throws IOException {
+    ensureOpen();
+    delegate.copyBytes(input, numBytes);
+  }
+
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) throws IOException {
+    ensureOpen();
+    delegate.writeMapOfStrings(map);
+  }
+
+  @Override
+  public void writeSetOfStrings(Set<String> set) throws IOException {
+    ensureOpen();
+    delegate.writeSetOfStrings(set);
+  }
+
+  private void ensureOpen() {
+    if (delegate == null) {
+      throw new AlreadyClosedException("Already closed.");
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
new file mode 100644
index 0000000..4578a4f
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils.IOConsumer;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import com.carrotsearch.randomizedtesting.generators.RandomBytes;
+import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+
+public abstract class BaseDataOutputTestCase<T extends DataOutput> extends RandomizedTest {
+  protected abstract T newInstance();
+  protected abstract byte[] toBytes(T instance);
+  
+  @FunctionalInterface
+  private interface ThrowingBiFunction<T, U, R> {
+    R apply(T t, U u) throws Exception;
+  }
+
+  @Test
+  public void testRandomizedWrites() throws IOException {
+    T dst = newInstance();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput ref = new OutputStreamDataOutput(baos);
+
+    long seed = randomLong();
+    int max = 50_000;
+    addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+    addRandomData(ref, new Xoroshiro128PlusRandom(seed), max);
+    assertArrayEquals(baos.toByteArray(), toBytes(dst));
+  }
+  
+  protected static List<IOConsumer<DataInput>> addRandomData(DataOutput dst, Random rnd, int maxAddCalls) throws IOException {
+    try {
+      List<IOConsumer<DataInput>> reply = new ArrayList<>();
+      for (int i = 0; i < maxAddCalls; i++) {
+        reply.add(RandomPicks.randomFrom(rnd, GENERATORS).apply(dst, rnd));
+      }
+      return reply;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }  
+
+  private static List<ThrowingBiFunction<DataOutput, Random, IOConsumer<DataInput>>> GENERATORS;
+  static {
+    GENERATORS = new ArrayList<>();
+
+    // writeByte/ readByte
+    GENERATORS.add((dst, rnd) -> {
+        byte value = (byte) rnd.nextInt();
+        dst.writeByte(value);
+        return (src) -> assertEquals("readByte()", value, src.readByte());
+      });
+
+    // writeBytes/ readBytes (array and buffer version).
+    GENERATORS.add((dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        ByteBuffersDataOutput rdo = dst instanceof ByteBuffersDataOutput ? (ByteBuffersDataOutput) dst : null;
+
+        if (rnd.nextBoolean() && rdo != null) {
+          rdo.writeBytes(ByteBuffer.wrap(bytes));
+        } else {
+          dst.writeBytes(bytes, bytes.length);
+        }
+
+        boolean useBuffersForRead = rnd.nextBoolean();
+        return (src) -> {
+          byte [] read = new byte [bytes.length];
+          if (useBuffersForRead && src instanceof ByteBuffersDataInput) {
+            ((ByteBuffersDataInput) src).readBytes(ByteBuffer.wrap(read), read.length);
+            assertArrayEquals("readBytes(ByteBuffer)", bytes, read);
+          } else {
+            src.readBytes(read, 0, read.length);
+            assertArrayEquals("readBytes(byte[])", bytes, read);
+          }
+        };
+      }
+    );
+
+    // writeBytes/ readBytes (array + offset).
+    GENERATORS.add((dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        int off = RandomNumbers.randomIntBetween(rnd, 0, bytes.length);
+        int len = RandomNumbers.randomIntBetween(rnd, 0, bytes.length - off); 
+        dst.writeBytes(bytes, off, len);
+
+        return (src) -> {
+          byte [] read = new byte [bytes.length + off];
+          src.readBytes(read, off, len);
+          assertArrayEquals(
+              "readBytes(byte[], off)",
+              ArrayUtil.copyOfSubArray(bytes, off, len + off),
+              ArrayUtil.copyOfSubArray(read, off, len + off));
+        };
+      }
+    );
+    
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeInt(v); 
+      return (src) -> assertEquals("readInt()", v, src.readInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong(); 
+      dst.writeLong(v); 
+      return (src) -> assertEquals("readLong()", v, src.readLong());
+    });
+    
+    GENERATORS.add((dst, rnd) -> {
+      short v = (short) rnd.nextInt(); 
+      dst.writeShort(v); 
+      return (src) -> assertEquals("readShort()", v, src.readShort());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeVInt(v); 
+      return (src) -> assertEquals("readVInt()", v, src.readVInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeZInt(v); 
+      return (src) -> assertEquals("readZInt()", v, src.readZInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong() & (-1L >>> 1); 
+      dst.writeVLong(v); 
+      return (src) -> assertEquals("readVLong()", v, src.readVLong());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong(); 
+      dst.writeZLong(v); 
+      return (src) -> assertEquals("readZLong()", v, src.readZLong());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      String v;
+      if (rnd.nextInt(50) == 0) {
+        // Occasionally a large blob.
+        v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 2048, 4096));
+      } else {
+        v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 0, 10));
+      }
+      dst.writeString(v);
+      return (src) -> assertEquals("readString()", v, src.readString());
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
new file mode 100644
index 0000000..5d3d7f6
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils.IOConsumer;
+import org.apache.lucene.util.LuceneTestCase;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import com.carrotsearch.randomizedtesting.annotations.Timeout;
+
+public final class TestByteBuffersDataInput extends RandomizedTest {
+  @Test
+  public void testSanity() throws IOException {
+    ByteBuffersDataOutput out = new ByteBuffersDataOutput();
+    ByteBuffersDataInput o1 = out.toDataInput();
+    assertEquals(0, o1.size());
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+        o1.readByte();
+    });
+
+    out.writeByte((byte) 1);
+
+    ByteBuffersDataInput o2 = out.toDataInput();
+    assertEquals(1, o2.size());
+    assertEquals(0, o2.position());
+    assertEquals(0, o1.size());
+
+    assertTrue(o2.ramBytesUsed() > 0);
+    assertEquals(1, o2.readByte());
+    assertEquals(1, o2.position());
+    assertEquals(1, o2.readByte(0));
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+        o2.readByte();
+    });
+
+    assertEquals(1, o2.position());
+  }
+
+  @Test
+  public void testRandomReads() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+
+    long seed = randomLong();
+    int max = 1_000_000;
+    List<IOConsumer<DataInput>> reply = 
+        TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+
+    ByteBuffersDataInput src = dst.toDataInput();
+    for (IOConsumer<DataInput> c : reply) {
+      c.accept(src);
+    }
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      src.readByte();
+    });
+  }
+
+  @Test
+  public void testRandomReadsOnSlices() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0;) {
+      ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+  
+      byte [] prefix = new byte [randomIntBetween(0, 1024 * 8)];
+      dst.writeBytes(prefix);
+  
+      long seed = randomLong();
+      int max = 10_000;
+      List<IOConsumer<DataInput>> reply = 
+          TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+  
+      byte [] suffix = new byte [randomIntBetween(0, 1024 * 8)];
+      dst.writeBytes(suffix);
+      
+      ByteBuffersDataInput src = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length - suffix.length);
+  
+      assertEquals(0, src.position());
+      assertEquals(dst.size() - prefix.length - suffix.length, src.size());
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(src);
+      }
+  
+      LuceneTestCase.expectThrows(EOFException.class, () -> {
+        src.readByte();
+      });
+    }
+  }
+
+  @Test
+  public void testSeekEmpty() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    ByteBuffersDataInput in = dst.toDataInput();
+    in.seek(0);
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      in.seek(1);
+    });
+
+    in.seek(0);
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      in.readByte();
+    });
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0;) {
+      ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+  
+      byte [] prefix = {};
+      if (randomBoolean()) {
+        prefix = new byte [randomIntBetween(1, 1024 * 8)];
+        dst.writeBytes(prefix);
+      }
+  
+      long seed = randomLong();
+      int max = 1000;
+      List<IOConsumer<DataInput>> reply = 
+          TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+  
+      ByteBuffersDataInput in = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length);
+  
+      in.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(in);
+      }
+  
+      in.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(in);
+      }
+  
+      byte [] array = dst.toArrayCopy();
+      array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length);
+      for (int i = 0; i < 1000; i++) {
+        int offs = randomIntBetween(0, array.length - 1);
+        in.seek(offs);
+        assertEquals(offs, in.position());
+        assertEquals(array[offs], in.readByte());
+      }
+      in.seek(in.size());
+      assertEquals(in.size(), in.position());
+      LuceneTestCase.expectThrows(EOFException.class, () -> {
+        in.readByte();
+      });
+    }
+  }
+
+  @Test
+  public void testSlicingWindow() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    assertEquals(0, dst.toDataInput().slice(0, 0).size());;
+
+    dst.writeBytes(randomBytesOfLength(1024 * 8));
+    ByteBuffersDataInput in = dst.toDataInput();
+    for (int offset = 0, max = (int) dst.size(); offset < max; offset++) {
+      assertEquals(0, in.slice(offset, 0).size());
+      assertEquals(1, in.slice(offset, 1).size());
+      
+      int window = Math.min(max - offset, 1024);
+      assertEquals(window, in.slice(offset, window).size());
+    }
+    assertEquals(0, in.slice((int) dst.size(), 0).size());
+  }
+
+  @Test
+  @Timeout(millis = 5000)
+  public void testEofOnArrayReadPastBufferSize() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    dst.writeBytes(new byte [10]);
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      ByteBuffersDataInput in = dst.toDataInput();
+      in.readBytes(new byte [100], 0, 100);
+    });
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      ByteBuffersDataInput in = dst.toDataInput();
+      in.readBytes(ByteBuffer.allocate(100), 100);
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
new file mode 100644
index 0000000..893aa37
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class TestByteBuffersDataOutput extends BaseDataOutputTestCase<ByteBuffersDataOutput> {
+  @Override
+  protected ByteBuffersDataOutput newInstance() {
+    return new ByteBuffersDataOutput();
+  }
+  
+  @Override
+  protected byte[] toBytes(ByteBuffersDataOutput instance) {
+    return instance.toArrayCopy();
+  }
+
+  @Test
+  public void testReuse() throws IOException {
+    AtomicInteger allocations = new AtomicInteger(0);
+    ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
+        (size) -> {
+          allocations.incrementAndGet();
+          return ByteBuffer.allocate(size);
+        });
+    
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput(
+        ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
+        ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK, 
+        reuser::allocate,
+        reuser::reuse);
+
+    // Add some random data first.
+    long genSeed = randomLong();
+    int addCount = randomIntBetween(1000, 5000);
+    addRandomData(o, new Random(genSeed), addCount);
+    byte[] data = o.toArrayCopy();
+
+    // Use the same sequence over reused instance.
+    final int expectedAllocationCount = allocations.get();
+    o.reset();
+    addRandomData(o, new Random(genSeed), addCount);
+
+    assertEquals(expectedAllocationCount, allocations.get());
+    assertArrayEquals(data, o.toArrayCopy());
+  }
+
+  @Test
+  public void testConstructorWithExpectedSize() {
+    {
+      ByteBuffersDataOutput o = new ByteBuffersDataOutput(0);
+      o.writeByte((byte) 0);
+      assertEquals(1 << ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, o.toBufferList().get(0).capacity());
+    }
+
+    {
+      long MB = 1024 * 1024;
+      long expectedSize = randomLongBetween(MB, MB * 1024);
+      ByteBuffersDataOutput o = new ByteBuffersDataOutput(expectedSize);
+      o.writeByte((byte) 0);
+      int cap = o.toBufferList().get(0).capacity();
+      assertTrue((cap >> 1) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION < expectedSize);
+      assertTrue("cap=" + cap + ", exp=" + expectedSize,
+          (cap) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION >= expectedSize);
+    }
+  }
+
+  @Test
+  public void testSanity() {
+    ByteBuffersDataOutput o = newInstance();
+    assertEquals(0, o.size());
+    assertEquals(0, o.toArrayCopy().length);
+    assertEquals(0, o.ramBytesUsed());
+
+    o.writeByte((byte) 1);
+    assertEquals(1, o.size());
+    assertTrue(o.ramBytesUsed() > 0);
+    assertArrayEquals(new byte [] { 1 }, o.toArrayCopy());
+
+    o.writeBytes(new byte [] {2, 3, 4}, 3);
+    assertEquals(4, o.size());
+    assertArrayEquals(new byte [] { 1, 2, 3, 4 }, o.toArrayCopy());    
+  }
+
+  @Test
+  public void testWriteByteBuffer() {
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput();
+    byte[] bytes = randomBytesOfLength(1024 * 8 + 10);
+    ByteBuffer src = ByteBuffer.wrap(bytes);
+    int offset = randomIntBetween(0, 100);
+    int len = bytes.length - offset;
+    src.position(offset);
+    src.limit(offset + len);
+    o.writeBytes(src);
+    assertEquals(len, o.size());
+    Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
+  }
+
+  @Test
+  public void testLargeArrayAdd() {
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput();
+    int MB = 1024 * 1024;
+    byte [] bytes = randomBytesOfLength(5 * MB, 15 * MB);
+    int offset = randomIntBetween(0, 100);
+    int len = bytes.length - offset;
+    o.writeBytes(bytes, offset, len);
+    assertEquals(len, o.size());
+    Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
+  }
+
+  @Test
+  public void testToBufferListReturnsReadOnlyBuffers() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    dst.writeBytes(new byte [100]);
+    for (ByteBuffer bb : dst.toBufferList()) {
+      assertTrue(bb.isReadOnly());
+    }
+  }
+  
+  @Test
+  public void testToWriteableBufferListReturnsOriginalBuffers() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    for (ByteBuffer bb : dst.toWriteableBufferList()) {
+      assertTrue(!bb.isReadOnly());
+      assertTrue(bb.hasArray()); // even the empty buffer should have a backing array.
+    }
+
+    dst.writeBytes(new byte [100]);
+    for (ByteBuffer bb : dst.toWriteableBufferList()) {
+      assertTrue(!bb.isReadOnly());
+      assertTrue(bb.hasArray()); // heap-based by default, so array should be there.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/7eb7b90e/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
new file mode 100644
index 0000000..5f2d447
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.util.English;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+public class TestByteBuffersDirectory extends BaseDirectoryTestCase {
+  private Supplier<ByteBuffersDirectory> implSupplier;
+
+  public TestByteBuffersDirectory(Supplier<ByteBuffersDirectory> implSupplier, String name) {
+    this.implSupplier = implSupplier;
+  }
+  
+  @Override
+  protected Directory getDirectory(Path path) throws IOException {
+    return implSupplier.get();
+  }
+
+  @Test
+  public void testBuildIndex() throws IOException {
+    try (Directory dir = getDirectory(null);
+         IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(
+            new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE))) {
+      int docs = RandomizedTest.randomIntBetween(0, 10);
+      for (int i = docs; i > 0; i--) {
+        Document doc = new Document();
+        doc.add(newStringField("content", English.intToEnglish(i).trim(), Field.Store.YES));
+        writer.addDocument(doc);
+      }
+      writer.commit();
+      assertEquals(docs, writer.numDocs());
+    }
+  }
+  
+  @ParametersFactory(argumentFormatting = "impl=%2$s")
+  public static Iterable<Object[]> parametersWithCustomName() {
+    return Arrays.asList(new Object [][] {
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS), "many buffers (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_ONE_BUFFER), "one buffer (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS_LUCENE), "lucene's buffers (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_BYTE_ARRAY), "byte array (heap)"},
+    });
+  }
+}


[2/2] lucene-solr:master: LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes).

Posted by dw...@apache.org.
LUCENE-8468: A ByteBuffer based Directory implementation (and associated classes).


Project: http://git-wip-us.apache.org/repos/asf/lucene-solr/repo
Commit: http://git-wip-us.apache.org/repos/asf/lucene-solr/commit/f762953a
Tree: http://git-wip-us.apache.org/repos/asf/lucene-solr/tree/f762953a
Diff: http://git-wip-us.apache.org/repos/asf/lucene-solr/diff/f762953a

Branch: refs/heads/master
Commit: f762953aab5be90052ee8bbfe6cbc9f1535356a6
Parents: a452dd9
Author: Dawid Weiss <dw...@apache.org>
Authored: Tue Aug 28 15:02:17 2018 +0200
Committer: Dawid Weiss <dw...@apache.org>
Committed: Tue Aug 28 15:02:53 2018 +0200

----------------------------------------------------------------------
 lucene/CHANGES.txt                              |   2 +
 .../lucene/store/ByteArrayIndexInput.java       | 107 +++-
 .../lucene/store/ByteBuffersDataInput.java      | 323 +++++++++++
 .../lucene/store/ByteBuffersDataOutput.java     | 541 +++++++++++++++++++
 .../lucene/store/ByteBuffersDirectory.java      | 275 ++++++++++
 .../lucene/store/ByteBuffersIndexInput.java     | 200 +++++++
 .../lucene/store/ByteBuffersIndexOutput.java    | 171 ++++++
 .../lucene/store/BaseDataOutputTestCase.java    | 181 +++++++
 .../lucene/store/TestByteBuffersDataInput.java  | 206 +++++++
 .../lucene/store/TestByteBuffersDataOutput.java | 157 ++++++
 .../lucene/store/TestByteBuffersDirectory.java  |  86 +++
 11 files changed, 2219 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/CHANGES.txt
----------------------------------------------------------------------
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 2dec765..413dda5 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -237,6 +237,8 @@ Changes in Runtime Behavior:
 
 Improvements
 
+* LUCENE-8468: A ByteBuffer based Directory implementation. (Dawid Weiss)
+
 * LUCENE-8447: Add DISJOINT and WITHIN support to LatLonShape queries. (Nick Knize)
 
 * LUCENE-8440: Add support for indexing and searching Line and Point shapes using LatLonShape encoding (Nick Knize)

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
index 6ad6125..9bf5ab2 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteArrayIndexInput.java
@@ -14,55 +14,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.lucene.store;
 
+import java.io.EOFException;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Locale;
 
-/** 
- * DataInput backed by a byte array.
- * <b>WARNING:</b> This class omits all low-level checks.
- * @lucene.experimental 
+/**
+ * A {@link IndexInput} backed by a byte array.
+ * 
+ * @lucene.experimental
  */
-public final class ByteArrayIndexInput extends IndexInput {
-
+public final class ByteArrayIndexInput extends IndexInput implements RandomAccessInput {
   private byte[] bytes;
 
+  private final int offset;
+  private final int length;
+
   private int pos;
-  private int limit;
 
   public ByteArrayIndexInput(String description, byte[] bytes) {
+    this(description, bytes, 0, bytes.length);
+  }
+  
+  public ByteArrayIndexInput(String description, byte[] bytes, int offs, int length) {
     super(description);
+    this.offset = offs;
     this.bytes = bytes;
-    this.limit = bytes.length;
+    this.length = length;
+    this.pos = offs;
   }
 
   public long getFilePointer() {
-    return pos;
+    return pos - offset;
   }
   
-  public void seek(long pos) {
-    this.pos = (int) pos;
-  }
-
-  public void reset(byte[] bytes, int offset, int len) {
-    this.bytes = bytes;
-    pos = offset;
-    limit = offset + len;
+  public void seek(long pos) throws EOFException {
+    int newPos = Math.toIntExact(pos + offset);
+    try {
+      if (pos < 0 || pos > length) {
+        throw new EOFException();
+      }
+    } finally {
+      this.pos = newPos;
+    }
   }
 
   @Override
   public long length() {
-    return limit;
-  }
-
-  public boolean eof() {
-    return pos == limit;
-  }
-
-  @Override
-  public void skipBytes(long count) {
-    pos += count;
+    return length;
   }
 
   @Override
@@ -153,9 +154,55 @@ public final class ByteArrayIndexInput extends IndexInput {
 
   @Override
   public void close() {
+    bytes = null;
   }
 
-  public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
-    throw new UnsupportedOperationException();
+  @Override
+  public IndexInput clone() {
+    ByteArrayIndexInput slice = slice("(cloned)" + toString(), 0, length());
+    try {
+      slice.seek(getFilePointer());
+    } catch (EOFException e) {
+      throw new UncheckedIOException(e);
+    }
+    return slice;
+  }
+
+  public ByteArrayIndexInput slice(String sliceDescription, long offset, long length) {
+    if (offset < 0 || length < 0 || offset + length > this.length) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "slice(offset=%s, length=%s) is out of bounds: %s",
+          offset, length, this));
+    }
+
+    return new ByteArrayIndexInput(sliceDescription, this.bytes, Math.toIntExact(this.offset + offset), 
+        Math.toIntExact(length));
+  }
+
+  @Override
+  public byte readByte(long pos) throws IOException {
+    return bytes[Math.toIntExact(offset + pos)];
+  }
+
+  @Override
+  public short readShort(long pos) throws IOException {
+    int i = Math.toIntExact(offset + pos);
+    return (short) (((bytes[i]     & 0xFF) << 8) |
+                     (bytes[i + 1] & 0xFF));
+  }
+
+  @Override
+  public int readInt(long pos) throws IOException {
+    int i = Math.toIntExact(offset + pos);
+    return ((bytes[i]     & 0xFF) << 24) | 
+           ((bytes[i + 1] & 0xFF) << 16) | 
+           ((bytes[i + 2] & 0xFF) <<  8) | 
+            (bytes[i + 3] & 0xFF);
+  }
+
+  @Override
+  public long readLong(long pos) throws IOException {
+    return (((long) readInt(pos)) << 32) | 
+             (readInt(pos + 4) & 0xFFFFFFFFL);
   }
 }

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
new file mode 100644
index 0000000..e8418ed
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.EOFException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+
+import org.apache.lucene.store.DataInput;
+import org.apache.lucene.store.RandomAccessInput;
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.RamUsageEstimator;
+
+/**
+ * A {@link DataInput} implementing {@link RandomAccessInput} and reading data from a
+ * list of {@link ByteBuffer}s.
+ */
+public final class ByteBuffersDataInput extends DataInput implements Accountable, RandomAccessInput {
+  private final ByteBuffer[] blocks;
+  private final int blockBits;
+  private final int blockMask;
+  private final long size;
+  private final long offset;
+
+  private long pos;
+
+  /**
+   * Read data from a set of contiguous buffers. All data buffers except for the last one 
+   * must have an identical remaining number of bytes in the buffer (that is a power of two). The last
+   * buffer can be of an arbitrary remaining length.
+   */
+  public ByteBuffersDataInput(List<ByteBuffer> buffers) {
+    ensureAssumptions(buffers);
+
+    this.blocks = buffers.stream().map(buf -> buf.asReadOnlyBuffer()).toArray(ByteBuffer[]::new);
+
+    if (blocks.length == 1) {
+      this.blockBits = 32;
+      this.blockMask = ~0;
+    } else {
+      final int blockBytes = determineBlockPage(buffers);
+      this.blockBits = Integer.numberOfTrailingZeros(blockBytes);
+      this.blockMask = (1 << blockBits) - 1;
+    }
+
+    this.size = Arrays.stream(blocks).mapToLong(block -> block.remaining()).sum();
+
+    // The initial "position" of this stream is shifted by the position of the first block.
+    this.offset = blocks[0].position();
+    this.pos = offset;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    // Return a rough estimation for allocated blocks. Note that we do not make
+    // any special distinction for what the type of buffer is (direct vs. heap-based).
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.length + 
+           Arrays.stream(blocks).mapToLong(buf -> buf.capacity()).sum();
+  }
+
+  @Override
+  public byte readByte() throws EOFException {
+    try {
+      ByteBuffer block = blocks[blockIndex(pos)];
+      byte v = block.get(blockOffset(pos));
+      pos++;
+      return v;
+    } catch (IndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  /**
+   * Reads exactly {@code len} bytes into the given buffer. The buffer must have
+   * enough remaining limit.
+   * 
+   * If there are fewer than {@code len} bytes in the input, {@link EOFException} 
+   * is thrown. 
+   */
+  public void readBytes(ByteBuffer buffer, int len) throws EOFException {
+    try {
+      while (len > 0) {
+        ByteBuffer block = blocks[blockIndex(pos)].duplicate();
+        int blockOffset = blockOffset(pos);
+        block.position(blockOffset);
+        int chunk = Math.min(len, block.remaining());
+        if (chunk == 0) {
+          throw new EOFException();
+        }
+
+        // Update pos early on for EOF detection on output buffer, then try to get buffer content.
+        pos += chunk;
+        block.limit(blockOffset + chunk);
+        buffer.put(block);
+
+        len -= chunk;
+      }
+    } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  @Override
+  public void readBytes(byte[] arr, int off, int len) throws EOFException {
+    try {
+      while (len > 0) {
+        ByteBuffer block = blocks[blockIndex(pos)].duplicate();
+        block.position(blockOffset(pos));
+        int chunk = Math.min(len, block.remaining());
+        if (chunk == 0) {
+          throw new EOFException();
+        }
+
+        // Update pos early on for EOF detection, then try to get buffer content.
+        pos += chunk;
+        block.get(arr, off, chunk);
+
+        len -= chunk;
+        off += chunk;
+      }
+    } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) {
+      if (pos >= size()) {
+        throw new EOFException();
+      } else {
+        throw e; // Something is wrong.
+      }
+    }
+  }
+
+  @Override
+  public byte readByte(long pos) {
+    pos += offset;
+    return blocks[blockIndex(pos)].get(blockOffset(pos));
+  }
+
+  @Override
+  public short readShort(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Short.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getShort(blockOffset);
+    } else {
+      return (short) ((readByte(pos    ) & 0xFF) << 8 | 
+                      (readByte(pos + 1) & 0xFF));
+    }
+  }
+
+  @Override
+  public int readInt(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Integer.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getInt(blockOffset);
+    } else {
+      return ((readByte(pos    )       ) << 24 |
+              (readByte(pos + 1) & 0xFF) << 16 |
+              (readByte(pos + 2) & 0xFF) << 8  |
+              (readByte(pos + 3) & 0xFF));
+    }
+  }
+
+  @Override
+  public long readLong(long pos) {
+    long absPos = offset + pos;
+    int blockOffset = blockOffset(absPos);
+    if (blockOffset + Long.BYTES <= blockMask) {
+      return blocks[blockIndex(absPos)].getLong(blockOffset);
+    } else {
+      return (((long) readInt(pos)) << 32) | (readInt(pos + 4) & 0xFFFFFFFFL);
+    }
+  }
+
+  public long position() {
+    return pos - offset;
+  }
+
+  public void seek(long position) throws EOFException {
+    this.pos = position + offset;
+    if (position > size()) {
+      this.pos = size();
+      throw new EOFException();
+    }
+  }
+  
+  public ByteBuffersDataInput slice(long offset, long length) {
+    if (offset < 0 || length < 0 || offset + length > this.size) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "slice(offset=%s, length=%s) is out of bounds: %s",
+          offset, length, this));
+    }
+
+    return new ByteBuffersDataInput(sliceBufferList(Arrays.asList(this.blocks), offset, length));
+  }
+
+  @Override
+  public String toString() {
+    return String.format(Locale.ROOT,
+        "%,d bytes, block size: %,d, blocks: %,d, position: %,d%s",
+        size(),
+        blockSize(),
+        blocks.length,
+        position(),
+        offset == 0 ? "" : String.format(Locale.ROOT, " [offset: %,d]", offset));
+  }
+
+  private final int blockIndex(long pos) {
+    return Math.toIntExact(pos >> blockBits);
+  }  
+
+  private final int blockOffset(long pos) {
+    return (int) pos & blockMask;
+  }  
+  
+  private int blockSize() {
+    return 1 << blockBits;
+  }
+
+  private static final boolean isPowerOfTwo(int v) {
+    return (v & (v - 1)) == 0;
+  }
+
+  private static void ensureAssumptions(List<ByteBuffer> buffers) {
+    if (buffers.isEmpty()) {
+      throw new IllegalArgumentException("Buffer list must not be empty.");
+    }
+
+    if (buffers.size() == 1) {
+      // Special case of just a single buffer, conditions don't apply.
+    } else {
+      final int blockPage = determineBlockPage(buffers);
+      
+      // First buffer decides on block page length.
+      if (!isPowerOfTwo(blockPage)) {
+        throw new IllegalArgumentException("The first buffer must have power-of-two position() + remaining(): 0x"
+            + Integer.toHexString(blockPage));
+      }
+
+      // Any block from 2..last-1 should have the same page size.
+      for (int i = 1, last = buffers.size() - 1; i < last; i++) {
+        ByteBuffer buffer = buffers.get(i);
+        if (buffer.position() != 0) {
+          throw new IllegalArgumentException("All buffers except for the first one must have position() == 0: " + buffer);
+        }
+        if (i != last && buffer.remaining() != blockPage) {
+          throw new IllegalArgumentException("Intermediate buffers must share an identical remaining() power-of-two block size: 0x" 
+              + Integer.toHexString(blockPage));
+        }
+      }
+    }
+  }
+
+  static int determineBlockPage(List<ByteBuffer> buffers) {
+    ByteBuffer first = buffers.get(0);
+    final int blockPage = Math.toIntExact((long) first.position() + first.remaining());
+    return blockPage;
+  }
+
+  private static List<ByteBuffer> sliceBufferList(List<ByteBuffer> buffers, long offset, long length) {
+    ensureAssumptions(buffers);
+
+    if (buffers.size() == 1) {
+      ByteBuffer cloned = buffers.get(0).asReadOnlyBuffer();
+      cloned.position(Math.toIntExact(cloned.position() + offset));
+      cloned.limit(Math.toIntExact(length + cloned.position()));
+      return Arrays.asList(cloned);
+    } else {
+      long absStart = buffers.get(0).position() + offset;
+      long absEnd = Math.toIntExact(absStart + length);
+
+      int blockBytes = ByteBuffersDataInput.determineBlockPage(buffers);
+      int blockBits = Integer.numberOfTrailingZeros(blockBytes);
+      int blockMask = (1 << blockBits) - 1;
+
+      int endOffset = (int) absEnd & blockMask;
+
+      ArrayList<ByteBuffer> cloned = 
+        buffers.subList(Math.toIntExact(absStart / blockBytes), 
+                        Math.toIntExact(absEnd / blockBytes + (endOffset == 0 ? 0 : 1)))
+          .stream()
+          .map(buf -> buf.asReadOnlyBuffer())
+          .collect(Collectors.toCollection(ArrayList::new));
+
+      if (endOffset == 0) {
+        cloned.add(ByteBuffer.allocate(0));
+      }
+
+      cloned.get(0).position((int) absStart & blockMask);
+      cloned.get(cloned.size() - 1).limit(endOffset);
+      return cloned;
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
new file mode 100644
index 0000000..8840f21
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataOutput.java
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.IntConsumer;
+import java.util.function.IntFunction;
+
+import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.apache.lucene.util.UnicodeUtil;
+
+/**
+ * A {@link DataOutput} storing data in a list of {@link ByteBuffer}s.
+ */
+public final class ByteBuffersDataOutput extends DataOutput implements Accountable {
+  private final static ByteBuffer EMPTY = ByteBuffer.allocate(0);
+  private final static byte [] EMPTY_BYTE_ARRAY = {};
+
+  public final static IntFunction<ByteBuffer> ALLOCATE_BB_ON_HEAP = ByteBuffer::allocate;
+
+  /**
+   * A singleton instance of "no-reuse" buffer strategy.
+   */
+  public final static Consumer<ByteBuffer> NO_REUSE = (bb) -> {
+    throw new RuntimeException("reset() is not allowed on this buffer.");
+  };
+
+  /**
+   * An implementation of a {@link ByteBuffer} allocation and recycling policy.
+   * The blocks are recycled if exactly the same size is requested, otherwise
+   * they're released to be GCed.
+   */
+  public final static class ByteBufferRecycler {
+    private final ArrayDeque<ByteBuffer> reuse = new ArrayDeque<>();
+    private final IntFunction<ByteBuffer> delegate;
+
+    public ByteBufferRecycler(IntFunction<ByteBuffer> delegate) {
+      this.delegate = Objects.requireNonNull(delegate);
+    }
+
+    public ByteBuffer allocate(int size) {
+      while (!reuse.isEmpty()) {
+        ByteBuffer bb = reuse.removeFirst();
+        // If we don't have a buffer of exactly the requested size, discard it.
+        if (bb.remaining() == size) {
+          return bb;
+        }
+      }
+
+      return delegate.apply(size);        
+    }
+
+    public void reuse(ByteBuffer buffer) {
+      buffer.rewind();
+      reuse.addLast(buffer);
+    }
+  }
+
+  public final static int DEFAULT_MIN_BITS_PER_BLOCK = 10; // 1024 B
+  public final static int DEFAULT_MAX_BITS_PER_BLOCK = 26; //   64 MB
+
+  /**
+   * Maximum number of blocks at the current {@link #blockBits} block size
+   * before we increase the block size (and thus decrease the number of blocks).
+   */
+  final static int MAX_BLOCKS_BEFORE_BLOCK_EXPANSION = 100;
+
+  /**
+   * Maximum block size: {@code 2^bits}.
+   */
+  private final int maxBitsPerBlock;
+
+  /**
+   * {@link ByteBuffer} supplier.
+   */
+  private final IntFunction<ByteBuffer> blockAllocate;
+
+  /**
+   * {@link ByteBuffer} recycler on {@link #reset}.
+   */
+  private final Consumer<ByteBuffer> blockReuse;
+
+  /**
+   * Current block size: {@code 2^bits}.
+   */
+  private int blockBits;
+
+  /**
+   * Blocks storing data.
+   */
+  private final ArrayDeque<ByteBuffer> blocks = new ArrayDeque<>();
+
+  /**
+   * The current-or-next write block.
+   */
+  private ByteBuffer currentBlock = EMPTY;
+
+  public ByteBuffersDataOutput(long expectedSize) {
+    this(computeBlockSizeBitsFor(expectedSize), DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
+  }
+
+  public ByteBuffersDataOutput() {
+    this(DEFAULT_MIN_BITS_PER_BLOCK, DEFAULT_MAX_BITS_PER_BLOCK, ALLOCATE_BB_ON_HEAP, NO_REUSE);
+  }
+
+  public ByteBuffersDataOutput(int minBitsPerBlock, 
+                               int maxBitsPerBlock,
+                               IntFunction<ByteBuffer> blockAllocate,
+                               Consumer<ByteBuffer> blockReuse) {
+    if (minBitsPerBlock < 10 ||
+        minBitsPerBlock > maxBitsPerBlock ||
+        maxBitsPerBlock > 31) {
+      throw new IllegalArgumentException(String.format(Locale.ROOT,
+          "Invalid arguments: %s %s",
+          minBitsPerBlock,
+          maxBitsPerBlock));
+    }
+    this.maxBitsPerBlock = maxBitsPerBlock;
+    this.blockBits = minBitsPerBlock;
+    this.blockAllocate = Objects.requireNonNull(blockAllocate, "Block allocator must not be null.");
+    this.blockReuse = Objects.requireNonNull(blockReuse, "Block reuse must not be null.");
+  }
+
+  @Override
+  public void writeByte(byte b) {
+    if (!currentBlock.hasRemaining()) {
+      appendBlock();
+    }
+    currentBlock.put(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] src, int offset, int length) {
+    assert length >= 0;
+    while (length > 0) {
+      if (!currentBlock.hasRemaining()) {
+        appendBlock();
+      }
+
+      int chunk = Math.min(currentBlock.remaining(), length);
+      currentBlock.put(src, offset, chunk);
+      length -= chunk;
+      offset += chunk;
+    }
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) {
+    writeBytes(b, 0, length);
+  }
+
+  public void writeBytes(byte[] b) {
+    writeBytes(b, 0, b.length);
+  }
+
+  public void writeBytes(ByteBuffer buffer) {
+    buffer = buffer.duplicate();
+    int length = buffer.remaining();
+    while (length > 0) {
+      if (!currentBlock.hasRemaining()) {
+        appendBlock();
+      }
+
+      int chunk = Math.min(currentBlock.remaining(), length);
+      buffer.limit(buffer.position() + chunk);
+      currentBlock.put(buffer);
+
+      length -= chunk;
+    }
+  }
+
+  /**
+   * Return a list of read-only view of {@link ByteBuffer} blocks over the 
+   * current content written to the output.
+   */
+  public ArrayList<ByteBuffer> toBufferList() {
+    ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
+    if (blocks.isEmpty()) {
+      result.add(EMPTY);
+    } else {
+      for (ByteBuffer bb : blocks) {
+        bb = (ByteBuffer) bb.asReadOnlyBuffer().flip(); // cast for jdk8 (covariant in jdk9+) 
+        result.add(bb);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Returns a list of writeable blocks over the (source) content buffers.
+   * 
+   * This method returns the raw content of source buffers that may change over the lifetime 
+   * of this object (blocks can be recycled or discarded, for example). Most applications 
+   * should favor calling {@link #toBufferList()} which returns a read-only <i>view</i> over 
+   * the content of the source buffers.
+   * 
+   * The difference between {@link #toBufferList()} and {@link #toWriteableBufferList()} is that
+   * read-only view of source buffers will always return {@code false} from {@link ByteBuffer#hasArray()}
+   * (which sometimes may be required to avoid double copying).
+   */
+  public ArrayList<ByteBuffer> toWriteableBufferList() {
+    ArrayList<ByteBuffer> result = new ArrayList<>(Math.max(blocks.size(), 1));
+    if (blocks.isEmpty()) {
+      result.add(EMPTY);
+    } else {
+      for (ByteBuffer bb : blocks) {
+        bb = (ByteBuffer) bb.duplicate().flip(); // cast for jdk8 (covariant in jdk9+) 
+        result.add(bb);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Return a {@link ByteBuffersDataInput} for the set of current buffers ({@link #toBufferList()}). 
+   */
+  public ByteBuffersDataInput toDataInput() {
+    return new ByteBuffersDataInput(toBufferList());
+  }
+
+  /**
+   * Return a contiguous array with the current content written to the output. The returned
+   * array is always a copy (can be mutated).
+   */
+  public byte[] toArrayCopy() {
+    if (blocks.size() == 0) {
+      return EMPTY_BYTE_ARRAY;
+    }
+
+    // We could try to detect single-block, array-based ByteBuffer here
+    // and use Arrays.copyOfRange, but I don't think it's worth the extra
+    // instance checks.
+
+    byte [] arr = new byte[Math.toIntExact(size())];
+    int offset = 0;
+    for (ByteBuffer bb : toBufferList()) {
+      int len = bb.remaining();
+      bb.get(arr, offset, len);
+      offset += len;
+    }
+    return arr;
+  }  
+
+  /**
+   * Copy the current content of this object into another {@link DataOutput}.
+   */
+  public void copyTo(DataOutput output) throws IOException {
+    for (ByteBuffer bb : toBufferList()) {
+      if (bb.hasArray()) {
+        output.writeBytes(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+      } else {
+        output.copyBytes(new ByteBuffersDataInput(Arrays.asList(bb)), bb.remaining());
+      }
+    }
+  }
+
+  /**
+   * @return The number of bytes written to this output so far.
+   */
+  public long size() {
+    long size = 0;
+    int blockCount = blocks.size();
+    if (blockCount >= 1) {
+      int fullBlockSize = (blockCount - 1) * blockSize();
+      int lastBlockSize = blocks.getLast().position();
+      size = fullBlockSize + lastBlockSize;
+    }
+    return size;
+  }
+
+  @Override
+  public String toString() {
+    return String.format(Locale.ROOT,
+        "%,d bytes, block size: %,d, blocks: %,d",
+        size(),
+        blockSize(),
+        blocks.size());
+  }
+
+  // Specialized versions of writeXXX methods that break execution into
+  // fast/ slow path if the result would fall on the current block's 
+  // boundary.
+  // 
+  // We also remove the IOException from methods because it (theoretically)
+  // cannot be thrown from byte buffers.
+
+  @Override
+  public void writeShort(short v) {
+    try {
+      if (currentBlock.remaining() >= Short.BYTES) {
+        currentBlock.putShort(v);
+      } else {
+        super.writeShort(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void writeInt(int v) {
+    try {
+      if (currentBlock.remaining() >= Integer.BYTES) {
+        currentBlock.putInt(v);
+      } else {
+        super.writeInt(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+
+  @Override
+  public void writeLong(long v) {
+    try {
+      if (currentBlock.remaining() >= Long.BYTES) {
+        currentBlock.putLong(v);
+      } else {
+        super.writeLong(v);
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+
+  @Override
+  public void writeString(String v) {
+    try {
+      final int MAX_CHARS_PER_WINDOW = 1024;
+      if (v.length() <= MAX_CHARS_PER_WINDOW) {
+        final BytesRef utf8 = new BytesRef(v);
+        writeVInt(utf8.length);
+        writeBytes(utf8.bytes, utf8.offset, utf8.length);
+      } else {
+        writeVInt(UnicodeUtil.calcUTF16toUTF8Length(v, 0, v.length()));
+        final byte [] buf = new byte [UnicodeUtil.MAX_UTF8_BYTES_PER_CHAR * MAX_CHARS_PER_WINDOW];
+        UTF16toUTF8(v, 0, v.length(), buf, (len) -> {
+          writeBytes(buf, 0, len);
+        });
+      }
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }    
+  }
+  
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) {
+    try {
+      super.writeMapOfStrings(map);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+  
+  @Override
+  public void writeSetOfStrings(Set<String> set) {
+    try {
+      super.writeSetOfStrings(set);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }      
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    // Return a rough estimation for allocated blocks. Note that we do not make
+    // any special distinction for direct memory buffers.
+    return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.size() + 
+           blocks.stream().mapToLong(buf -> buf.capacity()).sum();
+  }
+
+  /**
+   * This method resets this object to a clean (zero-size) state and
+   * publishes any currently allocated buffers for reuse to the reuse strategy
+   * provided in the constructor.
+   * 
+   * Sharing byte buffers for reads and writes is dangerous and will very likely
+   * lead to hard-to-debug issues, use with great care.
+   */
+  public void reset() {
+    blocks.stream().forEach(blockReuse);
+    blocks.clear();
+    currentBlock = EMPTY;
+  }
+
+  /**
+   * @return Returns a new {@link ByteBuffersDataOutput} with the {@link #reset()} capability. 
+   */
+  // TODO: perhaps we can move it out to an utility class (as a supplier of preconfigured instances?) 
+  public static ByteBuffersDataOutput newResettableInstance() {
+    ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
+        ByteBuffersDataOutput.ALLOCATE_BB_ON_HEAP); 
+    return new ByteBuffersDataOutput(
+        ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
+        ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK,
+        reuser::allocate,
+        reuser::reuse);
+  }
+
+  private int blockSize() {
+    return 1 << blockBits;
+  }
+
+  private void appendBlock() {
+    if (blocks.size() >= MAX_BLOCKS_BEFORE_BLOCK_EXPANSION && blockBits < maxBitsPerBlock) {
+      rewriteToBlockSize(blockBits + 1);
+      if (blocks.getLast().hasRemaining()) {
+        return;
+      }
+    }
+
+    final int requiredBlockSize = 1 << blockBits;
+    currentBlock = blockAllocate.apply(requiredBlockSize);
+    assert currentBlock.capacity() == requiredBlockSize;
+    blocks.add(currentBlock);
+  }
+
+  private void rewriteToBlockSize(int targetBlockBits) {
+    assert targetBlockBits <= maxBitsPerBlock;
+
+    // We copy over data blocks to an output with one-larger block bit size.
+    // We also discard references to blocks as we're copying to allow GC to
+    // clean up partial results in case of memory pressure.
+    ByteBuffersDataOutput cloned = new ByteBuffersDataOutput(targetBlockBits, targetBlockBits, blockAllocate, NO_REUSE);
+    ByteBuffer block;
+    while ((block = blocks.pollFirst()) != null) {
+      block.flip();
+      cloned.writeBytes(block);
+      if (blockReuse != NO_REUSE) {
+        blockReuse.accept(block);
+      }
+    }
+
+    assert blocks.isEmpty();
+    this.blockBits = targetBlockBits;
+    blocks.addAll(cloned.blocks);
+  }
+
+  private static int computeBlockSizeBitsFor(long bytes) {
+    long powerOfTwo = BitUtil.nextHighestPowerOfTwo(bytes / MAX_BLOCKS_BEFORE_BLOCK_EXPANSION);
+    if (powerOfTwo == 0) {
+      return DEFAULT_MIN_BITS_PER_BLOCK;
+    }
+    
+    int blockBits = Long.numberOfTrailingZeros(powerOfTwo);
+    blockBits = Math.min(blockBits, DEFAULT_MAX_BITS_PER_BLOCK);
+    blockBits = Math.max(blockBits, DEFAULT_MIN_BITS_PER_BLOCK);
+    return blockBits;
+  }
+
+  // TODO: move this block-based conversion to UnicodeUtil.
+  
+  private static final long HALF_SHIFT = 10;
+  private static final int SURROGATE_OFFSET = 
+      Character.MIN_SUPPLEMENTARY_CODE_POINT - 
+      (UnicodeUtil.UNI_SUR_HIGH_START << HALF_SHIFT) - UnicodeUtil.UNI_SUR_LOW_START;
+
+  /**
+   * A consumer-based UTF16-UTF8 encoder (writes the input string in smaller buffers.).
+   */
+  private static int UTF16toUTF8(final CharSequence s, 
+                                final int offset, 
+                                final int length, 
+                                byte[] buf, 
+                                IntConsumer bufferFlusher) {
+    int utf8Len = 0;
+    int j = 0;    
+    for (int i = offset, end = offset + length; i < end; i++) {
+      final int chr = (int) s.charAt(i);
+
+      if (j + 4 >= buf.length) {
+        bufferFlusher.accept(j);
+        utf8Len += j;
+        j = 0;
+      }
+
+      if (chr < 0x80)
+        buf[j++] = (byte) chr;
+      else if (chr < 0x800) {
+        buf[j++] = (byte) (0xC0 | (chr >> 6));
+        buf[j++] = (byte) (0x80 | (chr & 0x3F));
+      } else if (chr < 0xD800 || chr > 0xDFFF) {
+        buf[j++] = (byte) (0xE0 | (chr >> 12));
+        buf[j++] = (byte) (0x80 | ((chr >> 6) & 0x3F));
+        buf[j++] = (byte) (0x80 | (chr & 0x3F));
+      } else {
+        // A surrogate pair. Confirm valid high surrogate.
+        if (chr < 0xDC00 && (i < end - 1)) {
+          int utf32 = (int) s.charAt(i + 1);
+          // Confirm valid low surrogate and write pair.
+          if (utf32 >= 0xDC00 && utf32 <= 0xDFFF) { 
+            utf32 = (chr << 10) + utf32 + SURROGATE_OFFSET;
+            i++;
+            buf[j++] = (byte) (0xF0 | (utf32 >> 18));
+            buf[j++] = (byte) (0x80 | ((utf32 >> 12) & 0x3F));
+            buf[j++] = (byte) (0x80 | ((utf32 >> 6) & 0x3F));
+            buf[j++] = (byte) (0x80 | (utf32 & 0x3F));
+            continue;
+          }
+        }
+        // Replace unpaired surrogate or out-of-order low surrogate
+        // with substitution character.
+        buf[j++] = (byte) 0xEF;
+        buf[j++] = (byte) 0xBF;
+        buf[j++] = (byte) 0xBD;
+      }
+    }
+
+    bufferFlusher.accept(j);
+    utf8Len += j;
+
+    return utf8Len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
new file mode 100644
index 0000000..acff5cf
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDirectory.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.AccessDeniedException;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.NoSuchFileException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.zip.CRC32;
+
+import org.apache.lucene.index.IndexFileNames;
+import org.apache.lucene.util.BitUtil;
+
+/**
+ * A {@link ByteBuffer}-based {@link Directory} implementation that
+ * can be used to store index files on the heap.
+ *
+ * <p>Important: Note that {@link MMapDirectory} is nearly always a better choice as
+ * it uses OS caches more effectively (through memory-mapped buffers).
+ * A heap-based directory like this one can have the advantage in case of ephemeral, small,
+ * short-lived indexes when disk syncs provide an additional overhead.</p>
+ *
+ * @lucene.experimental
+ */
+public final class ByteBuffersDirectory extends BaseDirectory {
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS = 
+      (fileName, output) -> {
+        ByteBuffersDataInput dataInput = output.toDataInput();
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
+            ByteBuffersIndexInput.class.getSimpleName(),
+            fileName,
+            dataInput.toString());
+        return new ByteBuffersIndexInput(dataInput, inputName);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_ONE_BUFFER = 
+      (fileName, output) -> {
+        ByteBuffersDataInput dataInput = new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(output.toArrayCopy())));
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, buffers=%s)",
+            ByteBuffersIndexInput.class.getSimpleName(),
+            fileName,
+            dataInput.toString());
+        return new ByteBuffersIndexInput(dataInput, inputName);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_BYTE_ARRAY = 
+      (fileName, output) -> {
+        byte[] array = output.toArrayCopy();
+        String inputName = String.format(Locale.ROOT, "%s (file=%s, length=%s)",
+            ByteArrayIndexInput.class.getSimpleName(),
+            fileName,
+            array.length);
+        return new ByteArrayIndexInput(inputName, array, 0, array.length);
+      };
+
+  public static final BiFunction<String, ByteBuffersDataOutput, IndexInput> OUTPUT_AS_MANY_BUFFERS_LUCENE = 
+      (fileName, output) -> {
+        List<ByteBuffer> bufferList = output.toBufferList();
+        int chunkSizePower;
+        bufferList.add(ByteBuffer.allocate(0));
+        int blockSize = ByteBuffersDataInput.determineBlockPage(bufferList);
+        if (blockSize == 0) {
+          chunkSizePower = 30;
+        } else {
+          chunkSizePower = Integer.numberOfTrailingZeros(BitUtil.nextHighestPowerOfTwo(blockSize));
+        }
+
+        String inputName = String.format(Locale.ROOT, "%s (file=%s)",
+            ByteBuffersDirectory.class.getSimpleName(),
+            fileName);
+
+        ByteBufferGuard guard = new ByteBufferGuard("none", (String resourceDescription, ByteBuffer b) -> {});
+        return ByteBufferIndexInput.newInstance(inputName, 
+            bufferList.toArray(new ByteBuffer [bufferList.size()]), 
+            output.size(), chunkSizePower, guard);
+      };
+
+  private final Function<String, String> tempFileName = new Function<String, String>() {
+    private final AtomicLong counter = new AtomicLong();
+
+    @Override
+    public String apply(String suffix) {
+      return suffix + "_" + Long.toString(counter.getAndIncrement(), Character.MAX_RADIX);
+    }
+  };
+
+  private final ConcurrentHashMap<String, FileEntry> files = new ConcurrentHashMap<>();
+
+  /**
+   * Conversion between a buffered index output and the corresponding index input
+   * for a given file.   
+   */
+  private final BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput;
+
+  /**
+   * A supplier of {@link ByteBuffersDataOutput} instances used to buffer up 
+   * the content of written files.
+   */
+  private final Supplier<ByteBuffersDataOutput> bbOutputSupplier;
+
+  public ByteBuffersDirectory() {
+    this(new SingleInstanceLockFactory());
+  }
+  
+  public ByteBuffersDirectory(LockFactory lockFactory) {
+    this(lockFactory, ByteBuffersDataOutput::new, OUTPUT_AS_MANY_BUFFERS);
+  }
+
+  public ByteBuffersDirectory(LockFactory factory, 
+      Supplier<ByteBuffersDataOutput> bbOutputSupplier,
+      BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) {
+    super(factory);
+    this.outputToInput = Objects.requireNonNull(outputToInput);
+    this.bbOutputSupplier = Objects.requireNonNull(bbOutputSupplier);
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    ensureOpen();
+    return files.keySet().stream().sorted().toArray(String[]::new);
+  }
+
+  @Override
+  public void deleteFile(String name) throws IOException {
+    ensureOpen();
+    FileEntry removed = files.remove(name);
+    if (removed == null) {
+      throw new FileNotFoundException(name);
+    }
+  }
+
+  @Override
+  public long fileLength(String name) throws IOException {
+    ensureOpen();
+    FileEntry file = files.get(name);
+    if (file == null) {
+      throw new FileNotFoundException(name);
+    }
+    return file.length();
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    ensureOpen();
+    FileEntry e = new FileEntry(name); 
+    if (files.putIfAbsent(name, e) != null) {
+      throw new FileAlreadyExistsException("File already exists: " + name);
+    }
+    return e.createOutput(outputToInput);
+  }
+
+  @Override
+  public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
+    ensureOpen();
+    while (true) {
+      String name = IndexFileNames.segmentFileName(prefix, tempFileName.apply(suffix), "tmp");
+      FileEntry e = new FileEntry(name); 
+      if (files.putIfAbsent(name, e) == null) {
+        return e.createOutput(outputToInput);
+      }
+    }
+  }
+
+  @Override
+  public void rename(String source, String dest) throws IOException {
+    ensureOpen();
+
+    FileEntry file = files.get(source);
+    if (file == null) {
+      throw new FileNotFoundException(source);
+    }
+    if (files.putIfAbsent(dest, file) != null) {
+      throw new FileAlreadyExistsException(dest);
+    }
+    if (!files.remove(source, file)) {
+      throw new IllegalStateException("File was unexpectedly replaced: " + source);
+    }
+    files.remove(source);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+    ensureOpen();
+  }
+
+  @Override
+  public void syncMetaData() throws IOException {
+    ensureOpen();
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    ensureOpen();
+    FileEntry e = files.get(name);
+    if (e == null) {
+      throw new NoSuchFileException(name);
+    } else {
+      return e.openInput();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    isOpen = false;
+    files.clear();
+  }
+
+  private final class FileEntry {
+    private final String fileName;
+
+    private volatile IndexInput content;
+    private volatile long cachedLength;
+
+    public FileEntry(String name) {
+      this.fileName = name;
+    }
+
+    public long length() {
+      // We return 0 length until the IndexOutput is closed and flushed.
+      return cachedLength;
+    }
+
+    public IndexInput openInput() throws IOException {
+      IndexInput local = this.content;
+      if (local == null) {
+        throw new AccessDeniedException("Can't open a file still open for writing: " + fileName);
+      }
+
+      return local.clone();
+    }
+
+    final IndexOutput createOutput(BiFunction<String, ByteBuffersDataOutput, IndexInput> outputToInput) throws IOException {
+      if (content != null) {
+        throw new IOException("Can only write to a file once: " + fileName);
+      }
+
+      String clazzName = ByteBuffersDirectory.class.getSimpleName();
+      String outputName = String.format(Locale.ROOT, "%s output (file=%s)", clazzName, fileName);
+
+      return new ByteBuffersIndexOutput(
+          bbOutputSupplier.get(), outputName, fileName,
+          new CRC32(),
+          (output) -> {
+            content = outputToInput.apply(fileName, output);
+            cachedLength = output.size();
+          });
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
new file mode 100644
index 0000000..7c87d24
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RandomAccessInput;
+
+/**
+ * An {@link IndexInput} implementing {@link RandomAccessInput} and backed
+ * by a {@link ByteBuffersDataInput}.
+ */
+public final class ByteBuffersIndexInput extends IndexInput implements RandomAccessInput {
+  private ByteBuffersDataInput in;
+
+  public ByteBuffersIndexInput(ByteBuffersDataInput in, String resourceDescription) {
+    super(resourceDescription);
+    this.in = in;
+  }
+
+  @Override
+  public void close() throws IOException {
+    in = null;
+  }
+
+  @Override
+  public long getFilePointer() {
+    ensureOpen();
+    return in.position();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ensureOpen();
+    in.seek(pos);
+  }
+
+  @Override
+  public long length() {
+    ensureOpen();
+    return in.size();
+  }
+
+  @Override
+  public ByteBuffersIndexInput slice(String sliceDescription, long offset, long length) throws IOException {
+    ensureOpen();
+    return new ByteBuffersIndexInput(in.slice(offset, length), 
+        "(sliced) offset=" + offset + ", length=" + length + " " + toString());
+  }
+
+  @Override
+  public byte readByte() throws IOException {
+    ensureOpen();
+    return in.readByte();
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len) throws IOException {
+    ensureOpen();
+    in.readBytes(b, offset, len);
+  }
+
+  @Override
+  public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
+    ensureOpen();
+    return slice("", offset, length);
+  }
+
+  @Override
+  public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
+    ensureOpen();
+    in.readBytes(b, offset, len, useBuffer);
+  }
+
+  @Override
+  public short readShort() throws IOException {
+    ensureOpen();
+    return in.readShort();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    ensureOpen();
+    return in.readInt();
+  }
+
+  @Override
+  public int readVInt() throws IOException {
+    ensureOpen();
+    return in.readVInt();
+  }
+
+  @Override
+  public int readZInt() throws IOException {
+    ensureOpen();
+    return in.readZInt();
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    ensureOpen();
+    return in.readLong();
+  }
+
+  @Override
+  public long readVLong() throws IOException {
+    ensureOpen();
+    return in.readVLong();
+  }
+
+  @Override
+  public long readZLong() throws IOException {
+    ensureOpen();
+    return in.readZLong();
+  }
+
+  @Override
+  public String readString() throws IOException {
+    ensureOpen();
+    return in.readString();
+  }
+
+  @Override
+  public Map<String, String> readMapOfStrings() throws IOException {
+    ensureOpen();
+    return in.readMapOfStrings();
+  }
+
+  @Override
+  public Set<String> readSetOfStrings() throws IOException {
+    ensureOpen();
+    return in.readSetOfStrings();
+  }
+
+  @Override
+  public void skipBytes(long numBytes) throws IOException {
+    ensureOpen();
+    super.skipBytes(numBytes);
+  }
+
+  @Override
+  public byte readByte(long pos) throws IOException {
+    ensureOpen();
+    return in.readByte(pos);
+  }
+
+  @Override
+  public short readShort(long pos) throws IOException {
+    ensureOpen();
+    return in.readShort(pos);
+  }
+
+  @Override
+  public int readInt(long pos) throws IOException {
+    ensureOpen();
+    return in.readInt(pos);
+  }
+
+  @Override
+  public long readLong(long pos) throws IOException {
+    ensureOpen();
+    return in.readLong(pos);
+  }
+  
+  @Override
+  public IndexInput clone() {
+    ensureOpen();
+    ByteBuffersIndexInput cloned = new ByteBuffersIndexInput(in.slice(0, in.size()), "(clone of) " + toString());
+    try {
+      cloned.seek(getFilePointer());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+    return cloned;
+  }
+
+  private void ensureOpen() {
+    if (in == null) {
+      throw new AlreadyClosedException("Already closed.");
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
new file mode 100644
index 0000000..19dc400
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexOutput.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+/**
+ * An {@link IndexOutput} writing to a {@link ByteBuffersDataOutput}.
+ */
+public final class ByteBuffersIndexOutput extends IndexOutput {
+  private final Consumer<ByteBuffersDataOutput> onClose;
+  
+  private final Checksum checksum;
+  private long lastChecksumPosition;
+  private long lastChecksum;
+
+  private ByteBuffersDataOutput delegate;
+
+  public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name) {
+    this(delegate, resourceDescription, name, new CRC32(), null);
+  }
+
+  public ByteBuffersIndexOutput(ByteBuffersDataOutput delegate, String resourceDescription, String name, 
+      Checksum checksum,
+      Consumer<ByteBuffersDataOutput> onClose) {
+    super(resourceDescription, name);
+    this.delegate = delegate;
+    this.checksum = checksum;
+    this.onClose = onClose;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // No special effort to be thread-safe here since IndexOutputs are not required to be thread-safe.
+    ByteBuffersDataOutput local = delegate;
+    delegate = null;
+    if (local != null && onClose != null) {
+      onClose.accept(local);
+    }
+  }
+
+  @Override
+  public long getFilePointer() {
+    ensureOpen();
+    return delegate.size();
+  }
+
+  @Override
+  public long getChecksum() throws IOException {
+    ensureOpen();
+
+    if (checksum == null) {
+      throw new IOException("This index output has no checksum computing ability: " + toString());
+    }
+
+    // Compute checksum on the current content of the delegate.
+    //
+    // This way we can override more methods and pass them directly to the delegate for efficiency of writing,
+    // while allowing the checksum to be correctly computed on the current content of the output buffer (IndexOutput
+    // is per-thread, so no concurrent changes).
+    if (lastChecksumPosition != delegate.size()) {
+      lastChecksumPosition = delegate.size();
+      checksum.reset();
+      byte [] buffer = null;
+      for (ByteBuffer bb : delegate.toBufferList()) {
+        if (bb.hasArray()) {
+          checksum.update(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
+        } else {
+          if (buffer == null) buffer = new byte [1024 * 4];
+
+          bb = bb.asReadOnlyBuffer();
+          int remaining = bb.remaining();
+          while (remaining > 0) {
+            int len = Math.min(remaining, buffer.length);
+            bb.get(buffer, 0, len);
+            checksum.update(buffer, 0, len);
+            remaining -= len;
+          }
+        }
+      }
+      lastChecksum = checksum.getValue(); 
+    }
+    return lastChecksum;
+  }
+
+  @Override
+  public void writeByte(byte b) throws IOException {
+    ensureOpen();
+    delegate.writeByte(b);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int offset, int length) throws IOException {
+    ensureOpen();
+    delegate.writeBytes(b, offset, length);
+  }
+
+  @Override
+  public void writeBytes(byte[] b, int length) throws IOException {
+    ensureOpen();
+    delegate.writeBytes(b, length);
+  }
+
+  @Override
+  public void writeInt(int i) throws IOException {
+    ensureOpen();
+    delegate.writeInt(i);
+  }
+
+  @Override
+  public void writeShort(short i) throws IOException {
+    ensureOpen();
+    delegate.writeShort(i);
+  }
+
+  @Override
+  public void writeLong(long i) throws IOException {
+    ensureOpen();
+    delegate.writeLong(i);
+  }
+
+  @Override
+  public void writeString(String s) throws IOException {
+    ensureOpen();
+    delegate.writeString(s);
+  }
+
+  @Override
+  public void copyBytes(DataInput input, long numBytes) throws IOException {
+    ensureOpen();
+    delegate.copyBytes(input, numBytes);
+  }
+
+  @Override
+  public void writeMapOfStrings(Map<String, String> map) throws IOException {
+    ensureOpen();
+    delegate.writeMapOfStrings(map);
+  }
+
+  @Override
+  public void writeSetOfStrings(Set<String> set) throws IOException {
+    ensureOpen();
+    delegate.writeSetOfStrings(set);
+  }
+
+  private void ensureOpen() {
+    if (delegate == null) {
+      throw new AlreadyClosedException("Already closed.");
+    }
+  }  
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
new file mode 100644
index 0000000..4578a4f
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/BaseDataOutputTestCase.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils.IOConsumer;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import com.carrotsearch.randomizedtesting.generators.RandomBytes;
+import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
+import com.carrotsearch.randomizedtesting.generators.RandomPicks;
+import com.carrotsearch.randomizedtesting.generators.RandomStrings;
+
+public abstract class BaseDataOutputTestCase<T extends DataOutput> extends RandomizedTest {
+  protected abstract T newInstance();
+  protected abstract byte[] toBytes(T instance);
+  
+  @FunctionalInterface
+  private interface ThrowingBiFunction<T, U, R> {
+    R apply(T t, U u) throws Exception;
+  }
+
+  @Test
+  public void testRandomizedWrites() throws IOException {
+    T dst = newInstance();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput ref = new OutputStreamDataOutput(baos);
+
+    long seed = randomLong();
+    int max = 50_000;
+    addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+    addRandomData(ref, new Xoroshiro128PlusRandom(seed), max);
+    assertArrayEquals(baos.toByteArray(), toBytes(dst));
+  }
+  
+  protected static List<IOConsumer<DataInput>> addRandomData(DataOutput dst, Random rnd, int maxAddCalls) throws IOException {
+    try {
+      List<IOConsumer<DataInput>> reply = new ArrayList<>();
+      for (int i = 0; i < maxAddCalls; i++) {
+        reply.add(RandomPicks.randomFrom(rnd, GENERATORS).apply(dst, rnd));
+      }
+      return reply;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }  
+
+  private static List<ThrowingBiFunction<DataOutput, Random, IOConsumer<DataInput>>> GENERATORS;
+  static {
+    GENERATORS = new ArrayList<>();
+
+    // writeByte/ readByte
+    GENERATORS.add((dst, rnd) -> {
+        byte value = (byte) rnd.nextInt();
+        dst.writeByte(value);
+        return (src) -> assertEquals("readByte()", value, src.readByte());
+      });
+
+    // writeBytes/ readBytes (array and buffer version).
+    GENERATORS.add((dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        ByteBuffersDataOutput rdo = dst instanceof ByteBuffersDataOutput ? (ByteBuffersDataOutput) dst : null;
+
+        if (rnd.nextBoolean() && rdo != null) {
+          rdo.writeBytes(ByteBuffer.wrap(bytes));
+        } else {
+          dst.writeBytes(bytes, bytes.length);
+        }
+
+        boolean useBuffersForRead = rnd.nextBoolean();
+        return (src) -> {
+          byte [] read = new byte [bytes.length];
+          if (useBuffersForRead && src instanceof ByteBuffersDataInput) {
+            ((ByteBuffersDataInput) src).readBytes(ByteBuffer.wrap(read), read.length);
+            assertArrayEquals("readBytes(ByteBuffer)", bytes, read);
+          } else {
+            src.readBytes(read, 0, read.length);
+            assertArrayEquals("readBytes(byte[])", bytes, read);
+          }
+        };
+      }
+    );
+
+    // writeBytes/ readBytes (array + offset).
+    GENERATORS.add((dst, rnd) -> {
+        byte[] bytes = RandomBytes.randomBytesOfLengthBetween(rnd, 0, 100);
+        int off = RandomNumbers.randomIntBetween(rnd, 0, bytes.length);
+        int len = RandomNumbers.randomIntBetween(rnd, 0, bytes.length - off); 
+        dst.writeBytes(bytes, off, len);
+
+        return (src) -> {
+          byte [] read = new byte [bytes.length + off];
+          src.readBytes(read, off, len);
+          assertArrayEquals(
+              "readBytes(byte[], off)",
+              ArrayUtil.copyOfSubArray(bytes, off, len + off),
+              ArrayUtil.copyOfSubArray(read, off, len + off));
+        };
+      }
+    );
+    
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeInt(v); 
+      return (src) -> assertEquals("readInt()", v, src.readInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong(); 
+      dst.writeLong(v); 
+      return (src) -> assertEquals("readLong()", v, src.readLong());
+    });
+    
+    GENERATORS.add((dst, rnd) -> {
+      short v = (short) rnd.nextInt(); 
+      dst.writeShort(v); 
+      return (src) -> assertEquals("readShort()", v, src.readShort());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeVInt(v); 
+      return (src) -> assertEquals("readVInt()", v, src.readVInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      int v = rnd.nextInt(); 
+      dst.writeZInt(v); 
+      return (src) -> assertEquals("readZInt()", v, src.readZInt());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong() & (-1L >>> 1); 
+      dst.writeVLong(v); 
+      return (src) -> assertEquals("readVLong()", v, src.readVLong());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      long v = rnd.nextLong(); 
+      dst.writeZLong(v); 
+      return (src) -> assertEquals("readZLong()", v, src.readZLong());
+    });
+
+    GENERATORS.add((dst, rnd) -> {
+      String v;
+      if (rnd.nextInt(50) == 0) {
+        // Occasionally a large blob.
+        v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 2048, 4096));
+      } else {
+        v = RandomStrings.randomUnicodeOfLength(rnd, RandomNumbers.randomIntBetween(rnd, 0, 10));
+      }
+      dst.writeString(v);
+      return (src) -> assertEquals("readString()", v, src.readString());
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
new file mode 100644
index 0000000..5d3d7f6
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataInput.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.IOUtils.IOConsumer;
+import org.apache.lucene.util.LuceneTestCase;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.Xoroshiro128PlusRandom;
+import com.carrotsearch.randomizedtesting.annotations.Timeout;
+
+public final class TestByteBuffersDataInput extends RandomizedTest {
+  @Test
+  public void testSanity() throws IOException {
+    ByteBuffersDataOutput out = new ByteBuffersDataOutput();
+    ByteBuffersDataInput o1 = out.toDataInput();
+    assertEquals(0, o1.size());
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+        o1.readByte();
+    });
+
+    out.writeByte((byte) 1);
+
+    ByteBuffersDataInput o2 = out.toDataInput();
+    assertEquals(1, o2.size());
+    assertEquals(0, o2.position());
+    assertEquals(0, o1.size());
+
+    assertTrue(o2.ramBytesUsed() > 0);
+    assertEquals(1, o2.readByte());
+    assertEquals(1, o2.position());
+    assertEquals(1, o2.readByte(0));
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+        o2.readByte();
+    });
+
+    assertEquals(1, o2.position());
+  }
+
+  @Test
+  public void testRandomReads() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+
+    long seed = randomLong();
+    int max = 1_000_000;
+    List<IOConsumer<DataInput>> reply = 
+        TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+
+    ByteBuffersDataInput src = dst.toDataInput();
+    for (IOConsumer<DataInput> c : reply) {
+      c.accept(src);
+    }
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      src.readByte();
+    });
+  }
+
+  @Test
+  public void testRandomReadsOnSlices() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0;) {
+      ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+  
+      byte [] prefix = new byte [randomIntBetween(0, 1024 * 8)];
+      dst.writeBytes(prefix);
+  
+      long seed = randomLong();
+      int max = 10_000;
+      List<IOConsumer<DataInput>> reply = 
+          TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+  
+      byte [] suffix = new byte [randomIntBetween(0, 1024 * 8)];
+      dst.writeBytes(suffix);
+      
+      ByteBuffersDataInput src = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length - suffix.length);
+  
+      assertEquals(0, src.position());
+      assertEquals(dst.size() - prefix.length - suffix.length, src.size());
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(src);
+      }
+  
+      LuceneTestCase.expectThrows(EOFException.class, () -> {
+        src.readByte();
+      });
+    }
+  }
+
+  @Test
+  public void testSeekEmpty() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    ByteBuffersDataInput in = dst.toDataInput();
+    in.seek(0);
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      in.seek(1);
+    });
+
+    in.seek(0);
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      in.readByte();
+    });
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    for (int reps = randomIntBetween(1, 200); --reps > 0;) {
+      ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+  
+      byte [] prefix = {};
+      if (randomBoolean()) {
+        prefix = new byte [randomIntBetween(1, 1024 * 8)];
+        dst.writeBytes(prefix);
+      }
+  
+      long seed = randomLong();
+      int max = 1000;
+      List<IOConsumer<DataInput>> reply = 
+          TestByteBuffersDataOutput.addRandomData(dst, new Xoroshiro128PlusRandom(seed), max);
+  
+      ByteBuffersDataInput in = dst.toDataInput().slice(prefix.length, dst.size() - prefix.length);
+  
+      in.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(in);
+      }
+  
+      in.seek(0);
+      for (IOConsumer<DataInput> c : reply) {
+        c.accept(in);
+      }
+  
+      byte [] array = dst.toArrayCopy();
+      array = ArrayUtil.copyOfSubArray(array, prefix.length, array.length);
+      for (int i = 0; i < 1000; i++) {
+        int offs = randomIntBetween(0, array.length - 1);
+        in.seek(offs);
+        assertEquals(offs, in.position());
+        assertEquals(array[offs], in.readByte());
+      }
+      in.seek(in.size());
+      assertEquals(in.size(), in.position());
+      LuceneTestCase.expectThrows(EOFException.class, () -> {
+        in.readByte();
+      });
+    }
+  }
+
+  @Test
+  public void testSlicingWindow() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    assertEquals(0, dst.toDataInput().slice(0, 0).size());;
+
+    dst.writeBytes(randomBytesOfLength(1024 * 8));
+    ByteBuffersDataInput in = dst.toDataInput();
+    for (int offset = 0, max = (int) dst.size(); offset < max; offset++) {
+      assertEquals(0, in.slice(offset, 0).size());
+      assertEquals(1, in.slice(offset, 1).size());
+      
+      int window = Math.min(max - offset, 1024);
+      assertEquals(window, in.slice(offset, window).size());
+    }
+    assertEquals(0, in.slice((int) dst.size(), 0).size());
+  }
+
+  @Test
+  @Timeout(millis = 5000)
+  public void testEofOnArrayReadPastBufferSize() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    dst.writeBytes(new byte [10]);
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      ByteBuffersDataInput in = dst.toDataInput();
+      in.readBytes(new byte [100], 0, 100);
+    });
+
+    LuceneTestCase.expectThrows(EOFException.class, () -> {
+      ByteBuffersDataInput in = dst.toDataInput();
+      in.readBytes(ByteBuffer.allocate(100), 100);
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
new file mode 100644
index 0000000..893aa37
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDataOutput.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.ArrayUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public final class TestByteBuffersDataOutput extends BaseDataOutputTestCase<ByteBuffersDataOutput> {
+  @Override
+  protected ByteBuffersDataOutput newInstance() {
+    return new ByteBuffersDataOutput();
+  }
+  
+  @Override
+  protected byte[] toBytes(ByteBuffersDataOutput instance) {
+    return instance.toArrayCopy();
+  }
+
+  @Test
+  public void testReuse() throws IOException {
+    AtomicInteger allocations = new AtomicInteger(0);
+    ByteBuffersDataOutput.ByteBufferRecycler reuser = new ByteBuffersDataOutput.ByteBufferRecycler(
+        (size) -> {
+          allocations.incrementAndGet();
+          return ByteBuffer.allocate(size);
+        });
+    
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput(
+        ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK,
+        ByteBuffersDataOutput.DEFAULT_MAX_BITS_PER_BLOCK, 
+        reuser::allocate,
+        reuser::reuse);
+
+    // Add some random data first.
+    long genSeed = randomLong();
+    int addCount = randomIntBetween(1000, 5000);
+    addRandomData(o, new Random(genSeed), addCount);
+    byte[] data = o.toArrayCopy();
+
+    // Use the same sequence over reused instance.
+    final int expectedAllocationCount = allocations.get();
+    o.reset();
+    addRandomData(o, new Random(genSeed), addCount);
+
+    assertEquals(expectedAllocationCount, allocations.get());
+    assertArrayEquals(data, o.toArrayCopy());
+  }
+
+  @Test
+  public void testConstructorWithExpectedSize() {
+    {
+      ByteBuffersDataOutput o = new ByteBuffersDataOutput(0);
+      o.writeByte((byte) 0);
+      assertEquals(1 << ByteBuffersDataOutput.DEFAULT_MIN_BITS_PER_BLOCK, o.toBufferList().get(0).capacity());
+    }
+
+    {
+      long MB = 1024 * 1024;
+      long expectedSize = randomLongBetween(MB, MB * 1024);
+      ByteBuffersDataOutput o = new ByteBuffersDataOutput(expectedSize);
+      o.writeByte((byte) 0);
+      int cap = o.toBufferList().get(0).capacity();
+      assertTrue((cap >> 1) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION < expectedSize);
+      assertTrue("cap=" + cap + ", exp=" + expectedSize,
+          (cap) * ByteBuffersDataOutput.MAX_BLOCKS_BEFORE_BLOCK_EXPANSION >= expectedSize);
+    }
+  }
+
+  @Test
+  public void testSanity() {
+    ByteBuffersDataOutput o = newInstance();
+    assertEquals(0, o.size());
+    assertEquals(0, o.toArrayCopy().length);
+    assertEquals(0, o.ramBytesUsed());
+
+    o.writeByte((byte) 1);
+    assertEquals(1, o.size());
+    assertTrue(o.ramBytesUsed() > 0);
+    assertArrayEquals(new byte [] { 1 }, o.toArrayCopy());
+
+    o.writeBytes(new byte [] {2, 3, 4}, 3);
+    assertEquals(4, o.size());
+    assertArrayEquals(new byte [] { 1, 2, 3, 4 }, o.toArrayCopy());    
+  }
+
+  @Test
+  public void testWriteByteBuffer() {
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput();
+    byte[] bytes = randomBytesOfLength(1024 * 8 + 10);
+    ByteBuffer src = ByteBuffer.wrap(bytes);
+    int offset = randomIntBetween(0, 100);
+    int len = bytes.length - offset;
+    src.position(offset);
+    src.limit(offset + len);
+    o.writeBytes(src);
+    assertEquals(len, o.size());
+    Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
+  }
+
+  @Test
+  public void testLargeArrayAdd() {
+    ByteBuffersDataOutput o = new ByteBuffersDataOutput();
+    int MB = 1024 * 1024;
+    byte [] bytes = randomBytesOfLength(5 * MB, 15 * MB);
+    int offset = randomIntBetween(0, 100);
+    int len = bytes.length - offset;
+    o.writeBytes(bytes, offset, len);
+    assertEquals(len, o.size());
+    Assert.assertArrayEquals(ArrayUtil.copyOfSubArray(bytes, offset, offset + len), o.toArrayCopy());
+  }
+
+  @Test
+  public void testToBufferListReturnsReadOnlyBuffers() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    dst.writeBytes(new byte [100]);
+    for (ByteBuffer bb : dst.toBufferList()) {
+      assertTrue(bb.isReadOnly());
+    }
+  }
+  
+  @Test
+  public void testToWriteableBufferListReturnsOriginalBuffers() throws Exception {
+    ByteBuffersDataOutput dst = new ByteBuffersDataOutput();
+    for (ByteBuffer bb : dst.toWriteableBufferList()) {
+      assertTrue(!bb.isReadOnly());
+      assertTrue(bb.hasArray()); // even the empty buffer should have a backing array.
+    }
+
+    dst.writeBytes(new byte [100]);
+    for (ByteBuffer bb : dst.toWriteableBufferList()) {
+      assertTrue(!bb.isReadOnly());
+      assertTrue(bb.hasArray()); // heap-based by default, so array should be there.
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lucene-solr/blob/f762953a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
----------------------------------------------------------------------
diff --git a/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
new file mode 100644
index 0000000..5f2d447
--- /dev/null
+++ b/lucene/core/src/test/org/apache/lucene/store/TestByteBuffersDirectory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.store;
+
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.function.Supplier;
+
+import org.apache.lucene.analysis.MockAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.IndexWriterConfig.OpenMode;
+import org.apache.lucene.util.English;
+import org.junit.Test;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+public class TestByteBuffersDirectory extends BaseDirectoryTestCase {
+  private Supplier<ByteBuffersDirectory> implSupplier;
+
+  public TestByteBuffersDirectory(Supplier<ByteBuffersDirectory> implSupplier, String name) {
+    this.implSupplier = implSupplier;
+  }
+  
+  @Override
+  protected Directory getDirectory(Path path) throws IOException {
+    return implSupplier.get();
+  }
+
+  @Test
+  public void testBuildIndex() throws IOException {
+    try (Directory dir = getDirectory(null);
+         IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(
+            new MockAnalyzer(random())).setOpenMode(OpenMode.CREATE))) {
+      int docs = RandomizedTest.randomIntBetween(0, 10);
+      for (int i = docs; i > 0; i--) {
+        Document doc = new Document();
+        doc.add(newStringField("content", English.intToEnglish(i).trim(), Field.Store.YES));
+        writer.addDocument(doc);
+      }
+      writer.commit();
+      assertEquals(docs, writer.numDocs());
+    }
+  }
+  
+  @ParametersFactory(argumentFormatting = "impl=%2$s")
+  public static Iterable<Object[]> parametersWithCustomName() {
+    return Arrays.asList(new Object [][] {
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS), "many buffers (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_ONE_BUFFER), "one buffer (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_MANY_BUFFERS_LUCENE), "lucene's buffers (heap)"},
+      {(Supplier<ByteBuffersDirectory>) () -> new ByteBuffersDirectory(
+          new SingleInstanceLockFactory(), 
+          ByteBuffersDataOutput::new,
+          ByteBuffersDirectory.OUTPUT_AS_BYTE_ARRAY), "byte array (heap)"},
+    });
+  }
+}