You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/04 13:46:30 UTC

[09/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)

Faster sequential IO (CASSANDRA-8630)

Merge RandomAccessReader and NIODataInputStream class hierarchies
to share performance optimisation work across all readers.

patch by stefania; reviewed by ariel and benedict for CASSANDRA-8630


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce63ccc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce63ccc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce63ccc8

Branch: refs/heads/cassandra-3.0
Commit: ce63ccc842dc6e7129765391c611402eb02a3a23
Parents: 7c0bfe9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jul 27 16:34:46 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 4 12:42:35 2015 +0100

----------------------------------------------------------------------
 build.xml                                       |   8 +-
 lib/licenses/ohc-0.3.4.txt                      | 201 --------
 lib/licenses/ohc-0.4.2.txt                      | 201 ++++++++
 lib/ohc-core-0.4.2.jar                          | Bin 0 -> 126802 bytes
 lib/ohc-core-0.4.jar                            | Bin 127890 -> 0 bytes
 lib/ohc-core-j8-0.4.2.jar                       | Bin 0 -> 4994 bytes
 lib/ohc-core-j8-0.4.jar                         | Bin 4989 -> 0 bytes
 .../apache/cassandra/cache/AutoSavingCache.java |   2 +-
 .../org/apache/cassandra/cache/OHCProvider.java |   3 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../db/commitlog/CommitLogReplayer.java         |  19 +-
 .../cassandra/hints/ChecksummedDataInput.java   | 128 +++--
 .../org/apache/cassandra/hints/HintsReader.java |  67 ++-
 .../compress/CompressedRandomAccessReader.java  | 147 +++---
 .../io/compress/CompressedThrottledReader.java  |  48 --
 .../io/compress/CompressionMetadata.java        |   2 +-
 .../cassandra/io/sstable/KeyIterator.java       |   1 -
 .../io/sstable/format/SSTableReader.java        | 100 ++--
 .../cassandra/io/sstable/format/Version.java    |   2 +
 .../io/sstable/format/big/BigFormat.java        |   9 +
 .../io/sstable/format/big/BigTableReader.java   | 108 ++---
 .../io/sstable/format/big/BigTableWriter.java   |   2 -
 .../cassandra/io/util/AbstractDataInput.java    | 343 -------------
 .../io/util/BufferedSegmentedFile.java          |  24 -
 .../cassandra/io/util/ByteBufferDataInput.java  | 171 -------
 .../apache/cassandra/io/util/ChannelProxy.java  |   4 +-
 .../io/util/ChecksummedRandomAccessReader.java  |  62 ++-
 .../io/util/CompressedSegmentedFile.java        | 100 ++--
 .../cassandra/io/util/DataInputBuffer.java      |  24 +-
 .../io/util/DataIntegrityMetadata.java          |  11 +-
 .../apache/cassandra/io/util/FileDataInput.java |  25 +-
 .../io/util/FileSegmentInputStream.java         |  96 ++++
 .../cassandra/io/util/ICompressedFile.java      |   9 +-
 .../cassandra/io/util/MemoryInputStream.java    |  54 ++-
 .../cassandra/io/util/MmappedRegions.java       | 344 +++++++++++++
 .../cassandra/io/util/MmappedSegmentedFile.java | 214 +++-----
 .../cassandra/io/util/NIODataInputStream.java   | 352 +-------------
 .../cassandra/io/util/RandomAccessReader.java   | 417 ++++++++++------
 .../io/util/RebufferingInputStream.java         | 286 +++++++++++
 .../apache/cassandra/io/util/SegmentedFile.java | 115 ++---
 .../cassandra/io/util/ThrottledReader.java      |  48 --
 .../compress/CompressedStreamWriter.java        |  10 +-
 .../apache/cassandra/utils/ByteBufferUtil.java  |   3 -
 .../org/apache/cassandra/utils/Throwables.java  |  45 +-
 .../apache/cassandra/utils/vint/VIntCoding.java |   2 +-
 .../cassandra/db/commitlog/CommitLogTest.java   |   8 +-
 .../db/commitlog/CommitLogTestReplayer.java     |   3 +-
 .../hints/ChecksummedDataInputTest.java         | 227 +++++++--
 .../io/ChecksummedRandomAccessReaderTest.java   | 127 -----
 .../cassandra/io/RandomAccessReaderTest.java    | 269 -----------
 .../CompressedRandomAccessReaderTest.java       | 245 +++++-----
 .../CompressedSequentialWriterTest.java         |   2 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |   3 +-
 .../io/util/BufferedDataOutputStreamTest.java   |  12 +-
 .../io/util/BufferedRandomAccessFileTest.java   |  91 +---
 .../util/ChecksummedRandomAccessReaderTest.java | 127 +++++
 .../io/util/FileSegmentInputStreamTest.java     | 131 +++++
 .../apache/cassandra/io/util/MemoryTest.java    |  37 ++
 .../cassandra/io/util/MmappedRegionsTest.java   | 375 ++++++++++++++
 .../io/util/NIODataInputStreamTest.java         |  16 +-
 .../io/util/RandomAccessReaderTest.java         | 483 +++++++++++++++++++
 61 files changed, 3339 insertions(+), 2626 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 28f08d6..252b1a8 100644
--- a/build.xml
+++ b/build.xml
@@ -415,8 +415,8 @@
           <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
           -->
           <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" version="4.4.2" />
-          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" />
-          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" />
+          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" />
+          <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" />
           <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" />
           <dependency groupId="org.fusesource" artifactId="sigar" version="1.6.4">
           	<exclusion groupId="log4j" artifactId="log4j"/>
@@ -470,8 +470,8 @@
         <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
         -->
         <dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
-        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" />
-        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" />
+        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" />
+        <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" />
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/>
         <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.3.4.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/ohc-0.3.4.txt b/lib/licenses/ohc-0.3.4.txt
deleted file mode 100644
index eb6b5d3..0000000
--- a/lib/licenses/ohc-0.3.4.txt
+++ /dev/null
@@ -1,201 +0,0 @@
-                                 Apache License
-                           Version 2.0, January 2004
-                        http://www.apache.org/licenses/
-
-   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
-   1. Definitions.
-
-      "License" shall mean the terms and conditions for use, reproduction,
-      and distribution as defined by Sections 1 through 9 of this document.
-
-      "Licensor" shall mean the copyright owner or entity authorized by
-      the copyright owner that is granting the License.
-
-      "Legal Entity" shall mean the union of the acting entity and all
-      other entities that control, are controlled by, or are under common
-      control with that entity. For the purposes of this definition,
-      "control" means (i) the power, direct or indirect, to cause the
-      direction or management of such entity, whether by contract or
-      otherwise, or (ii) ownership of fifty percent (50%) or more of the
-      outstanding shares, or (iii) beneficial ownership of such entity.
-
-      "You" (or "Your") shall mean an individual or Legal Entity
-      exercising permissions granted by this License.
-
-      "Source" form shall mean the preferred form for making modifications,
-      including but not limited to software source code, documentation
-      source, and configuration files.
-
-      "Object" form shall mean any form resulting from mechanical
-      transformation or translation of a Source form, including but
-      not limited to compiled object code, generated documentation,
-      and conversions to other media types.
-
-      "Work" shall mean the work of authorship, whether in Source or
-      Object form, made available under the License, as indicated by a
-      copyright notice that is included in or attached to the work
-      (an example is provided in the Appendix below).
-
-      "Derivative Works" shall mean any work, whether in Source or Object
-      form, that is based on (or derived from) the Work and for which the
-      editorial revisions, annotations, elaborations, or other modifications
-      represent, as a whole, an original work of authorship. For the purposes
-      of this License, Derivative Works shall not include works that remain
-      separable from, or merely link (or bind by name) to the interfaces of,
-      the Work and Derivative Works thereof.
-
-      "Contribution" shall mean any work of authorship, including
-      the original version of the Work and any modifications or additions
-      to that Work or Derivative Works thereof, that is intentionally
-      submitted to Licensor for inclusion in the Work by the copyright owner
-      or by an individual or Legal Entity authorized to submit on behalf of
-      the copyright owner. For the purposes of this definition, "submitted"
-      means any form of electronic, verbal, or written communication sent
-      to the Licensor or its representatives, including but not limited to
-      communication on electronic mailing lists, source code control systems,
-      and issue tracking systems that are managed by, or on behalf of, the
-      Licensor for the purpose of discussing and improving the Work, but
-      excluding communication that is conspicuously marked or otherwise
-      designated in writing by the copyright owner as "Not a Contribution."
-
-      "Contributor" shall mean Licensor and any individual or Legal Entity
-      on behalf of whom a Contribution has been received by Licensor and
-      subsequently incorporated within the Work.
-
-   2. Grant of Copyright License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      copyright license to reproduce, prepare Derivative Works of,
-      publicly display, publicly perform, sublicense, and distribute the
-      Work and such Derivative Works in Source or Object form.
-
-   3. Grant of Patent License. Subject to the terms and conditions of
-      this License, each Contributor hereby grants to You a perpetual,
-      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
-      (except as stated in this section) patent license to make, have made,
-      use, offer to sell, sell, import, and otherwise transfer the Work,
-      where such license applies only to those patent claims licensable
-      by such Contributor that are necessarily infringed by their
-      Contribution(s) alone or by combination of their Contribution(s)
-      with the Work to which such Contribution(s) was submitted. If You
-      institute patent litigation against any entity (including a
-      cross-claim or counterclaim in a lawsuit) alleging that the Work
-      or a Contribution incorporated within the Work constitutes direct
-      or contributory patent infringement, then any patent licenses
-      granted to You under this License for that Work shall terminate
-      as of the date such litigation is filed.
-
-   4. Redistribution. You may reproduce and distribute copies of the
-      Work or Derivative Works thereof in any medium, with or without
-      modifications, and in Source or Object form, provided that You
-      meet the following conditions:
-
-      (a) You must give any other recipients of the Work or
-          Derivative Works a copy of this License; and
-
-      (b) You must cause any modified files to carry prominent notices
-          stating that You changed the files; and
-
-      (c) You must retain, in the Source form of any Derivative Works
-          that You distribute, all copyright, patent, trademark, and
-          attribution notices from the Source form of the Work,
-          excluding those notices that do not pertain to any part of
-          the Derivative Works; and
-
-      (d) If the Work includes a "NOTICE" text file as part of its
-          distribution, then any Derivative Works that You distribute must
-          include a readable copy of the attribution notices contained
-          within such NOTICE file, excluding those notices that do not
-          pertain to any part of the Derivative Works, in at least one
-          of the following places: within a NOTICE text file distributed
-          as part of the Derivative Works; within the Source form or
-          documentation, if provided along with the Derivative Works; or,
-          within a display generated by the Derivative Works, if and
-          wherever such third-party notices normally appear. The contents
-          of the NOTICE file are for informational purposes only and
-          do not modify the License. You may add Your own attribution
-          notices within Derivative Works that You distribute, alongside
-          or as an addendum to the NOTICE text from the Work, provided
-          that such additional attribution notices cannot be construed
-          as modifying the License.
-
-      You may add Your own copyright statement to Your modifications and
-      may provide additional or different license terms and conditions
-      for use, reproduction, or distribution of Your modifications, or
-      for any such Derivative Works as a whole, provided Your use,
-      reproduction, and distribution of the Work otherwise complies with
-      the conditions stated in this License.
-
-   5. Submission of Contributions. Unless You explicitly state otherwise,
-      any Contribution intentionally submitted for inclusion in the Work
-      by You to the Licensor shall be under the terms and conditions of
-      this License, without any additional terms or conditions.
-      Notwithstanding the above, nothing herein shall supersede or modify
-      the terms of any separate license agreement you may have executed
-      with Licensor regarding such Contributions.
-
-   6. Trademarks. This License does not grant permission to use the trade
-      names, trademarks, service marks, or product names of the Licensor,
-      except as required for reasonable and customary use in describing the
-      origin of the Work and reproducing the content of the NOTICE file.
-
-   7. Disclaimer of Warranty. Unless required by applicable law or
-      agreed to in writing, Licensor provides the Work (and each
-      Contributor provides its Contributions) on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-      implied, including, without limitation, any warranties or conditions
-      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
-      PARTICULAR PURPOSE. You are solely responsible for determining the
-      appropriateness of using or redistributing the Work and assume any
-      risks associated with Your exercise of permissions under this License.
-
-   8. Limitation of Liability. In no event and under no legal theory,
-      whether in tort (including negligence), contract, or otherwise,
-      unless required by applicable law (such as deliberate and grossly
-      negligent acts) or agreed to in writing, shall any Contributor be
-      liable to You for damages, including any direct, indirect, special,
-      incidental, or consequential damages of any character arising as a
-      result of this License or out of the use or inability to use the
-      Work (including but not limited to damages for loss of goodwill,
-      work stoppage, computer failure or malfunction, or any and all
-      other commercial damages or losses), even if such Contributor
-      has been advised of the possibility of such damages.
-
-   9. Accepting Warranty or Additional Liability. While redistributing
-      the Work or Derivative Works thereof, You may choose to offer,
-      and charge a fee for, acceptance of support, warranty, indemnity,
-      or other liability obligations and/or rights consistent with this
-      License. However, in accepting such obligations, You may act only
-      on Your own behalf and on Your sole responsibility, not on behalf
-      of any other Contributor, and only if You agree to indemnify,
-      defend, and hold each Contributor harmless for any liability
-      incurred by, or claims asserted against, such Contributor by reason
-      of your accepting any such warranty or additional liability.
-
-   END OF TERMS AND CONDITIONS
-
-   APPENDIX: How to apply the Apache License to your work.
-
-      To apply the Apache License to your work, attach the following
-      boilerplate notice, with the fields enclosed by brackets "[]"
-      replaced with your own identifying information. (Don't include
-      the brackets!)  The text should be enclosed in the appropriate
-      comment syntax for the file format. We also recommend that a
-      file or class name and description of purpose be included on the
-      same "printed page" as the copyright notice for easier
-      identification within third-party archives.
-
-   Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
-
-   Licensed under the Apache License, Version 2.0 (the "License");
-   you may not use this file except in compliance with the License.
-   You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.4.2.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/ohc-0.4.2.txt b/lib/licenses/ohc-0.4.2.txt
new file mode 100644
index 0000000..eb6b5d3
--- /dev/null
+++ b/lib/licenses/ohc-0.4.2.txt
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.2.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-0.4.2.jar b/lib/ohc-core-0.4.2.jar
new file mode 100644
index 0000000..019adcd
Binary files /dev/null and b/lib/ohc-core-0.4.2.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-0.4.jar b/lib/ohc-core-0.4.jar
deleted file mode 100644
index 1b1b939..0000000
Binary files a/lib/ohc-core-0.4.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.2.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-j8-0.4.2.jar b/lib/ohc-core-j8-0.4.2.jar
new file mode 100644
index 0000000..583f4aa
Binary files /dev/null and b/lib/ohc-core-j8-0.4.2.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-j8-0.4.jar b/lib/ohc-core-j8-0.4.jar
deleted file mode 100644
index f97ddf5..0000000
Binary files a/lib/ohc-core-j8-0.4.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 8650d47..ebd2830 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -68,7 +68,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
     {
         public InputStream getInputStream(File dataPath, File crcPath) throws IOException
         {
-            return ChecksummedRandomAccessReader.open(dataPath, crcPath);
+            return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build();
         }
 
         public OutputStream getOutputStream(File dataPath, File crcPath)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index b0b4521..c6c6bb7 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBufferFixed;
 import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.caffinitas.ohc.OHCache;
 import org.caffinitas.ohc.OHCacheBuilder;
 
@@ -172,7 +173,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
         {
             try
             {
-                NIODataInputStream in = new DataInputBuffer(buf, false);
+                RebufferingInputStream in = new DataInputBuffer(buf, false);
                 boolean isSentinel = in.readBoolean();
                 if (isSentinel)
                     return new RowCacheSentinel(in.readLong());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index cf8e14d..d54ee8b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -676,7 +676,7 @@ public final class SystemKeyspace
     {
         try
         {
-            NIODataInputStream in = new DataInputBuffer(bytes, true);
+            RebufferingInputStream in = new DataInputBuffer(bytes, true);
             return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 4f50008..7049191 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -47,12 +47,13 @@ import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
 import org.apache.cassandra.schema.CompressionParams;
 import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
@@ -290,8 +291,8 @@ public class CommitLogReplayer
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
-        RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
-        try
+        try(ChannelProxy channel = new ChannelProxy(file);
+            RandomAccessReader reader = RandomAccessReader.open(channel))
         {
             if (desc.version < CommitLogDescriptor.VERSION_21)
             {
@@ -299,7 +300,7 @@ public class CommitLogReplayer
                     return;
                 if (globalPosition.segment == desc.id)
                     reader.seek(globalPosition.position);
-                replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation);
+                replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
                 return;
             }
 
@@ -388,7 +389,7 @@ public class CommitLogReplayer
                         if (uncompressedLength > uncompressedBuffer.length)
                             uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
                         compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
-                        sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
+                        sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
                         errorContext = "compressed section at " + start + " in " + errorContext;
                     }
                     catch (IOException | ArrayIndexOutOfBoundsException e)
@@ -403,10 +404,6 @@ public class CommitLogReplayer
                 if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
                     break;
             }
-        }
-        finally
-        {
-            FileUtils.closeQuietly(reader);
             logger.info("Finished reading {}", file);
         }
     }
@@ -522,7 +519,7 @@ public class CommitLogReplayer
     {
 
         final Mutation mutation;
-        try (NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
         {
             mutation = Mutation.serializer.deserialize(bufIn,
                                                        desc.getMessagingVersion(),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index fa727bc..543f14e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -17,98 +17,148 @@
  */
 package org.apache.cassandra.hints;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.zip.CRC32;
 
-import org.apache.cassandra.io.util.AbstractDataInput;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
 
 /**
- * An {@link AbstractDataInput} wrapper that calctulates the CRC in place.
+ * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
  *
  * Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
- * to allocate an extra byte array just that purpose.
+ * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc().
  *
- * In addition to calculating the CRC, allows to enforce a maximim known size. This is needed
+ * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed
  * so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
  * corrupted sequence by reading a huge corrupted length of bytes via
  * via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
  */
-public final class ChecksummedDataInput extends AbstractDataInput
+public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
 {
     private final CRC32 crc;
-    private final AbstractDataInput source;
-    private int limit;
+    private int crcPosition;
+    private boolean crcUpdateDisabled;
 
-    private ChecksummedDataInput(AbstractDataInput source)
+    private long limit;
+    private FileMark limitMark;
+
+    private ChecksummedDataInput(Builder builder)
     {
-        this.source = source;
+        super(builder);
 
         crc = new CRC32();
-        limit = Integer.MAX_VALUE;
+        crcPosition = 0;
+        crcUpdateDisabled = false;
+
+        resetLimit();
     }
 
-    public static ChecksummedDataInput wrap(AbstractDataInput source)
+    public static ChecksummedDataInput open(File file)
     {
-        return new ChecksummedDataInput(source);
+        return new Builder(new ChannelProxy(file)).build();
     }
 
     public void resetCrc()
     {
         crc.reset();
+        crcPosition = buffer.position();
     }
 
-    public void resetLimit()
+    public void limit(long newLimit)
     {
-        limit = Integer.MAX_VALUE;
+        limit = newLimit;
+        limitMark = mark();
     }
 
-    public void limit(int newLimit)
+    public void resetLimit()
     {
-        limit = newLimit;
+        limit = Long.MAX_VALUE;
+        limitMark = null;
     }
 
-    public int bytesRemaining()
+    public void checkLimit(int length) throws IOException
     {
-        return limit;
+        if (limitMark == null)
+            return;
+
+        if ((bytesPastLimit() + length) > limit)
+            throw new IOException("Digest mismatch exception");
     }
 
-    public int getCrc()
+    public long bytesPastLimit()
     {
-        return (int) crc.getValue();
+        assert limitMark != null;
+        return bytesPastMark(limitMark);
     }
 
-    public void seek(long position) throws IOException
+    public boolean checkCrc() throws IOException
     {
-        source.seek(position);
+        try
+        {
+            updateCrc();
+
+            // we must diable crc updates in case we rebuffer
+            // when called source.readInt()
+            crcUpdateDisabled = true;
+            return ((int) crc.getValue()) == readInt();
+        }
+        finally
+        {
+            crcPosition = buffer.position();
+            crcUpdateDisabled = false;
+        }
     }
 
-    public long getPosition()
+    @Override
+    public void readFully(byte[] b) throws IOException
     {
-        return source.getPosition();
+        checkLimit(b.length);
+        super.readFully(b);
     }
 
-    public long getPositionLimit()
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
     {
-        return source.getPositionLimit();
+        checkLimit(len);
+        return super.read(b, off, len);
     }
 
-    public int read() throws IOException
+    @Override
+    public void reBuffer()
     {
-        int b = source.read();
-        crc.update(b);
-        limit--;
-        return b;
+        updateCrc();
+        super.reBuffer();
+        crcPosition = buffer.position();
     }
 
-    @Override
-    public int read(byte[] buff, int offset, int length) throws IOException
+    private void updateCrc()
     {
-        if (length > limit)
-            throw new IOException("Digest mismatch exception");
+        if (crcPosition == buffer.position() | crcUpdateDisabled)
+            return;
+
+        assert crcPosition >= 0 && crcPosition < buffer.position();
 
-        int copied = source.read(buff, offset, length);
-        crc.update(buff, offset, copied);
-        limit -= copied;
-        return copied;
+        ByteBuffer unprocessed = buffer.duplicate();
+        unprocessed.position(crcPosition)
+                   .limit(buffer.position());
+
+        crc.update(unprocessed);
+    }
+
+    public final static class Builder extends RandomAccessReader.Builder
+    {
+        public Builder(ChannelProxy channel)
+        {
+            super(channel);
+        }
+
+        public ChecksummedDataInput build()
+        {
+            return new ChecksummedDataInput(this);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 7d164b4..bc83654 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 
 import javax.annotation.Nullable;
 
+import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.slf4j.Logger;
@@ -32,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.UnknownColumnFamilyException;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CLibrary;
@@ -57,25 +57,23 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
     private final HintsDescriptor descriptor;
     private final File file;
-    private final RandomAccessReader reader;
-    private final ChecksummedDataInput crcInput;
+    private final ChecksummedDataInput input;
 
     // we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized
     @Nullable
     private final RateLimiter rateLimiter;
 
-    private HintsReader(HintsDescriptor descriptor, File file, RandomAccessReader reader, RateLimiter rateLimiter)
+    private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
     {
         this.descriptor = descriptor;
         this.file = file;
-        this.reader = reader;
-        this.crcInput = ChecksummedDataInput.wrap(reader);
+        this.input = reader;
         this.rateLimiter = rateLimiter;
     }
 
     static HintsReader open(File file, RateLimiter rateLimiter)
     {
-        RandomAccessReader reader = RandomAccessReader.open(file);
+        ChecksummedDataInput reader = ChecksummedDataInput.open(file);
         try
         {
             HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
@@ -83,7 +81,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         }
         catch (IOException e)
         {
-            reader.close();
+            FileUtils.closeQuietly(reader);
             throw new FSReadError(e, file);
         }
     }
@@ -95,7 +93,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
     public void close()
     {
-        FileUtils.closeQuietly(reader);
+        FileUtils.closeQuietly(input);
     }
 
     public HintsDescriptor descriptor()
@@ -105,7 +103,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
     void seek(long newPosition)
     {
-        reader.seek(newPosition);
+        input.seek(newPosition);
     }
 
     public Iterator<Page> iterator()
@@ -138,12 +136,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         @SuppressWarnings("resource")
         protected Page computeNext()
         {
-            CLibrary.trySkipCache(reader.getChannel().getFileDescriptor(), 0, reader.getFilePointer(), reader.getPath());
+            CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
 
-            if (reader.length() == reader.getFilePointer())
+            if (input.length() == input.getFilePointer())
                 return endOfData();
 
-            return new Page(reader.getFilePointer());
+            return new Page(input.getFilePointer());
         }
     }
 
@@ -166,9 +164,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
             do
             {
-                long position = reader.getFilePointer();
+                long position = input.getFilePointer();
 
-                if (reader.length() == position)
+                if (input.length() == position)
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)
@@ -190,13 +188,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
         private Hint computeNextInternal() throws IOException
         {
-            crcInput.resetCrc();
-            crcInput.resetLimit();
+            input.resetCrc();
+            input.resetLimit();
 
-            int size = crcInput.readInt();
+            int size = input.readInt();
 
             // if we cannot corroborate the size via crc, then we cannot safely skip this hint
-            if (reader.readInt() != crcInput.getCrc())
+            if (!input.checkCrc())
                 throw new IOException("Digest mismatch exception");
 
             return readHint(size);
@@ -206,12 +204,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         {
             if (rateLimiter != null)
                 rateLimiter.acquire(size);
-            crcInput.limit(size);
+            input.limit(size);
 
             Hint hint;
             try
             {
-                hint = Hint.serializer.deserialize(crcInput, descriptor.messagingVersion());
+                hint = Hint.serializer.deserialize(input, descriptor.messagingVersion());
+                input.checkLimit(0);
             }
             catch (UnknownColumnFamilyException e)
             {
@@ -219,18 +218,18 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
                             descriptor.hostId,
                             e.cfId,
                             descriptor.fileName());
-                reader.skipBytes(crcInput.bytesRemaining());
+                input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit()));
 
                 return null;
             }
 
-            if (reader.readInt() == crcInput.getCrc())
+            if (input.checkCrc())
                 return hint;
 
             // log a warning and skip the corrupted entry
             logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
                         descriptor.hostId,
-                        crcInput.getPosition() - size - 4,
+                        input.getPosition() - size - 4,
                         descriptor.fileName());
             return null;
         }
@@ -255,9 +254,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
             do
             {
-                long position = reader.getFilePointer();
+                long position = input.getFilePointer();
 
-                if (reader.length() == position)
+                if (input.length() == position)
                     return endOfData(); // reached EOF
 
                 if (position - offset >= PAGE_SIZE)
@@ -279,13 +278,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
 
         private ByteBuffer computeNextInternal() throws IOException
         {
-            crcInput.resetCrc();
-            crcInput.resetLimit();
+            input.resetCrc();
+            input.resetLimit();
 
-            int size = crcInput.readInt();
+            int size = input.readInt();
 
             // if we cannot corroborate the size via crc, then we cannot safely skip this hint
-            if (reader.readInt() != crcInput.getCrc())
+            if (!input.checkCrc())
                 throw new IOException("Digest mismatch exception");
 
             return readBuffer(size);
@@ -295,16 +294,16 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
         {
             if (rateLimiter != null)
                 rateLimiter.acquire(size);
-            crcInput.limit(size);
+            input.limit(size);
 
-            ByteBuffer buffer = ByteBufferUtil.read(crcInput, size);
-            if (reader.readInt() == crcInput.getCrc())
+            ByteBuffer buffer = ByteBufferUtil.read(input, size);
+            if (input.checkCrc())
                 return buffer;
 
             // log a warning and skip the corrupted entry
             logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
                         descriptor.hostId,
-                        crcInput.getPosition() - size - 4,
+                        input.getPosition() - size - 4,
                         descriptor.fileName());
             return null;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index c38f4d2..0242871 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,11 +19,7 @@ package org.apache.cassandra.io.compress;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.Adler32;
 import java.util.zip.Checksum;
 
 import com.google.common.primitives.Ints;
@@ -31,7 +27,6 @@ import com.google.common.primitives.Ints;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.memory.BufferPool;
 
 /**
@@ -40,18 +35,6 @@ import org.apache.cassandra.utils.memory.BufferPool;
  */
 public class CompressedRandomAccessReader extends RandomAccessReader
 {
-    public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata)
-    {
-        return new CompressedRandomAccessReader(channel, metadata, null);
-    }
-
-    public static CompressedRandomAccessReader open(ICompressedFile file)
-    {
-        return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
-    }
-
-    private final TreeMap<Long, MappedByteBuffer> chunkSegments;
-
     private final CompressionMetadata metadata;
 
     // we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
@@ -63,39 +46,59 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     // raw checksum bytes
     private ByteBuffer checksumBytes;
 
-    protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file)
+    protected CompressedRandomAccessReader(Builder builder)
     {
-        super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType());
-        this.metadata = metadata;
-        checksum = metadata.checksumType.newInstance();
+        super(builder.initializeBuffers(false));
+        this.metadata = builder.metadata;
+        this.checksum = metadata.checksumType.newInstance();
 
-        chunkSegments = file == null ? null : file.chunkSegments();
-        if (chunkSegments == null)
-        {
-            compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().preferredBufferType());
-            checksumBytes = ByteBuffer.wrap(new byte[4]);
-        }
+        initializeBuffer();
     }
 
-    protected int getBufferSize(int size)
+    @Override
+    protected int getBufferSize(RandomAccessReader.Builder builder)
     {
-        assert Integer.bitCount(size) == 1; //must be a power of two
-        return size;
+        // this is the chunk data length, throttling is OK with this
+        return builder.bufferSize;
     }
 
     @Override
-    public void close()
+    protected void initializeBuffer()
     {
-        super.close();
+        buffer = allocateBuffer(bufferSize);
+        buffer.limit(0);
 
-        if (compressed != null)
+        if (regions == null)
         {
-            BufferPool.put(compressed);
-            compressed = null;
+            compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
+            checksumBytes = ByteBuffer.wrap(new byte[4]);
         }
     }
 
-    private void reBufferStandard()
+    @Override
+    protected void releaseBuffer()
+    {
+        try
+        {
+            if (buffer != null)
+            {
+                BufferPool.put(buffer);
+                buffer = null;
+            }
+        }
+        finally
+        {
+            // this will always be null if using mmap access mode (unlike in parent, where buffer is set to a region)
+            if (compressed != null)
+            {
+                BufferPool.put(compressed);
+                compressed = null;
+            }
+        }
+    }
+
+    @Override
+    protected void reBufferStandard()
     {
         try
         {
@@ -105,13 +108,19 @@ public class CompressedRandomAccessReader extends RandomAccessReader
             CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 
             if (compressed.capacity() < chunk.length)
-                compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType());
+            {
+                BufferPool.put(compressed);
+                compressed = allocateBuffer(chunk.length);
+            }
             else
+            {
                 compressed.clear();
-            compressed.limit(chunk.length);
+            }
 
+            compressed.limit(chunk.length);
             if (channel.read(compressed, chunk.offset) != chunk.length)
                 throw new CorruptBlockException(getPath(), chunk);
+
             compressed.flip();
             buffer.clear();
 
@@ -158,7 +167,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
-    private void reBufferMmap()
+    @Override
+    protected void reBufferMmap()
     {
         try
         {
@@ -167,10 +177,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
             CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
 
-            Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
-            long segmentOffset = entry.getKey();
+            MmappedRegions.Region region = regions.floor(chunk.offset);
+            long segmentOffset = region.bottom();
             int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
-            ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
+            ByteBuffer compressedChunk = region.buffer.duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
 
             compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
 
@@ -218,19 +228,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
 
     }
 
-    @Override
-    protected void reBuffer()
-    {
-        if (chunkSegments != null)
-        {
-            reBufferMmap();
-        }
-        else
-        {
-            reBufferStandard();
-        }
-    }
-
     private int checksum(CompressionMetadata.Chunk chunk) throws IOException
     {
         long position = chunk.offset + chunk.length;
@@ -240,11 +237,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         return checksumBytes.getInt(0);
     }
 
-    public int getTotalBufferSize()
-    {
-        return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity());
-    }
-
     @Override
     public long length()
     {
@@ -256,4 +248,39 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     {
         return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength);
     }
+
+    public final static class Builder extends RandomAccessReader.Builder
+    {
+        private final CompressionMetadata metadata;
+
+        public Builder(ICompressedFile file)
+        {
+            super(file.channel());
+            this.metadata = applyMetadata(file.getMetadata());
+            this.regions = file.regions();
+        }
+
+        public Builder(ChannelProxy channel, CompressionMetadata metadata)
+        {
+            super(channel);
+            this.metadata = applyMetadata(metadata);
+        }
+
+        private CompressionMetadata applyMetadata(CompressionMetadata metadata)
+        {
+            this.overrideLength = metadata.compressedFileLength;
+            this.bufferSize = metadata.chunkLength();
+            this.bufferType = metadata.compressor().preferredBufferType();
+
+            assert Integer.bitCount(this.bufferSize) == 1; //must be a power of two
+
+            return metadata;
+        }
+
+        @Override
+        public RandomAccessReader build()
+        {
+            return new CompressedRandomAccessReader(this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
deleted file mode 100644
index ea5edaf..0000000
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.cassandra.io.compress;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.ICompressedFile;
-
-public class CompressedThrottledReader extends CompressedRandomAccessReader
-{
-    private final RateLimiter limiter;
-
-    public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter)
-    {
-        super(channel, metadata, file);
-        this.limiter = limiter;
-    }
-
-    protected void reBuffer()
-    {
-        limiter.acquire(buffer.capacity());
-        super.reBuffer();
-    }
-
-    public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
-    {
-        return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index f5d8f7e..1681b0c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -92,7 +92,7 @@ public class CompressionMetadata
     }
 
     @VisibleForTesting
-    CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
+    public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
     {
         this.indexFilePath = indexFilePath;
         this.checksumType = checksumType;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 124720e..6f1e2f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 522f7a1..5d8ab50 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -828,8 +828,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     if (!summaryLoaded)
                     {
                         summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
-                        ibuilder.addPotentialBoundary(indexPosition);
-                        dbuilder.addPotentialBoundary(indexEntry.position);
                     }
                 }
 
@@ -868,8 +866,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
                     metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
             first = decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = decorateKey(ByteBufferUtil.readWithLength(iStream));
-            ibuilder.deserializeBounds(iStream);
-            dbuilder.deserializeBounds(iStream);
+            ibuilder.deserializeBounds(iStream, descriptor.version);
+            dbuilder.deserializeBounds(iStream, descriptor.version);
         }
         catch (IOException e)
         {
@@ -904,39 +902,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (ifile == null)
             return false;
 
-        Iterator<FileDataInput> segments = ifile.iterator(0);
         int i = 0;
         int summaryEntriesChecked = 0;
         int expectedIndexInterval = getMinIndexInterval();
-        while (segments.hasNext())
+        String path = null;
+        try (FileDataInput in = ifile.createReader(0))
         {
-            String path = null;
-            try (FileDataInput in = segments.next())
+            path = in.getPath();
+            while (!in.isEOF())
             {
-                path = in.getPath();
-                while (!in.isEOF())
+                ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+                if (i % expectedIndexInterval == 0)
                 {
-                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
-                    if (i % expectedIndexInterval == 0)
-                    {
-                        ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
-                        if (!summaryKey.equals(indexKey))
-                            return false;
-                        summaryEntriesChecked++;
+                    ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
+                    if (!summaryKey.equals(indexKey))
+                        return false;
+                    summaryEntriesChecked++;
 
-                        if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
-                            return true;
-                    }
-                    RowIndexEntry.Serializer.skip(in);
-                    i++;
+                    if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
+                        return true;
                 }
-            }
-            catch (IOException e)
-            {
-                markSuspect();
-                throw new CorruptSSTableException(e, path);
+                RowIndexEntry.Serializer.skip(in);
+                i++;
             }
         }
+        catch (IOException e)
+        {
+            markSuspect();
+            throw new CorruptSSTableException(e, path);
+        }
 
         return true;
     }
@@ -972,8 +966,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
             IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
             ByteBufferUtil.writeWithLength(first.getKey(), oStream);
             ByteBufferUtil.writeWithLength(last.getKey(), oStream);
-            ibuilder.serializeBounds(oStream);
-            dbuilder.serializeBounds(oStream);
+            ibuilder.serializeBounds(oStream, descriptor.version);
+            dbuilder.serializeBounds(oStream, descriptor.version);
         }
         catch (IOException e)
         {
@@ -1600,29 +1594,25 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         if (ifile == null)
             return null;
 
-        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
-        while (segments.hasNext())
+        String path = null;
+        try (FileDataInput in = ifile.createReader(sampledPosition))
         {
-            String path = null;
-            try (FileDataInput in = segments.next();)
+            path = in.getPath();
+            while (!in.isEOF())
             {
-                path = in.getPath();
-                while (!in.isEOF())
-                {
-                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
-                    DecoratedKey indexDecoratedKey = decorateKey(indexKey);
-                    if (indexDecoratedKey.compareTo(token) > 0)
-                        return indexDecoratedKey;
+                ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+                DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+                if (indexDecoratedKey.compareTo(token) > 0)
+                    return indexDecoratedKey;
 
-                    RowIndexEntry.Serializer.skip(in);
-                }
-            }
-            catch (IOException e)
-            {
-                markSuspect();
-                throw new CorruptSSTableException(e, path);
+                RowIndexEntry.Serializer.skip(in);
             }
         }
+        catch (IOException e)
+        {
+            markSuspect();
+            throw new CorruptSSTableException(e, path);
+        }
 
         return null;
     }
@@ -1744,11 +1734,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      */
     public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift);
 
-
-
     public FileDataInput getFileDataInput(long position)
     {
-        return dfile.getSegment(position);
+        return dfile.createReader(position);
     }
 
     /**
@@ -1939,7 +1927,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
     public RandomAccessReader openDataReader(RateLimiter limiter)
     {
         assert limiter != null;
-        return dfile.createThrottledReader(limiter);
+        return dfile.createReader(limiter);
     }
 
     public RandomAccessReader openDataReader()
@@ -1954,6 +1942,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
         return null;
     }
 
+    public ChannelProxy getDataChannel()
+    {
+        return dfile.channel;
+    }
+
+    public ChannelProxy getIndexChannel()
+    {
+        return ifile.channel;
+    }
+
     /**
      * @param component component to get timestamp.
      * @return last modified time for given component. 0 if given component does not exist or IO error occurs.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 4fcf055..16829ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -68,6 +68,8 @@ public abstract class Version
 
     public abstract boolean hasCompactionAncestors();
 
+    public abstract boolean hasBoundaries();
+
     public String getVersion()
     {
         return version;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d65710e..cbc2c39 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -134,6 +134,7 @@ public class BigFormat implements SSTableFormat
         private final boolean newFileName;
         public final boolean storeRows;
         public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
+        public final boolean hasBoundaries;
         /**
          * CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped)
          * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
@@ -176,6 +177,8 @@ public class BigFormat implements SSTableFormat
             correspondingMessagingVersion = storeRows
                                           ? MessagingService.VERSION_30
                                           : MessagingService.VERSION_21;
+
+            hasBoundaries = version.compareTo("ma") < 0;
         }
 
         @Override
@@ -251,6 +254,12 @@ public class BigFormat implements SSTableFormat
         }
 
         @Override
+        public boolean hasBoundaries()
+        {
+            return hasBoundaries;
+        }
+
+        @Override
         public boolean isCompatible()
         {
             return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 87608fd..4b66942 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -178,78 +178,74 @@ public class BigTableReader extends SSTableReader
         // is lesser than the first key of next interval (and in that case we must return the position of the first key
         // of the next interval).
         int i = 0;
-        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
-        while (segments.hasNext())
+        String path = null;
+        try (FileDataInput in = ifile.createReader(sampledPosition))
         {
-            String path = null;
-            try (FileDataInput in = segments.next())
+            path = in.getPath();
+            while (!in.isEOF())
             {
-                path = in.getPath();
-                while (!in.isEOF())
-                {
-                    i++;
+                i++;
 
-                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+                ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 
-                    boolean opSatisfied; // did we find an appropriate position for the op requested
-                    boolean exactMatch; // is the current position an exact match for the key, suitable for caching
+                boolean opSatisfied; // did we find an appropriate position for the op requested
+                boolean exactMatch; // is the current position an exact match for the key, suitable for caching
 
-                    // Compare raw keys if possible for performance, otherwise compare decorated keys.
-                    if (op == Operator.EQ && i <= effectiveInterval)
-                    {
-                        opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
-                    }
-                    else
+                // Compare raw keys if possible for performance, otherwise compare decorated keys.
+                if (op == Operator.EQ && i <= effectiveInterval)
+                {
+                    opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
+                }
+                else
+                {
+                    DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+                    int comparison = indexDecoratedKey.compareTo(key);
+                    int v = op.apply(comparison);
+                    opSatisfied = (v == 0);
+                    exactMatch = (comparison == 0);
+                    if (v < 0)
                     {
-                        DecoratedKey indexDecoratedKey = decorateKey(indexKey);
-                        int comparison = indexDecoratedKey.compareTo(key);
-                        int v = op.apply(comparison);
-                        opSatisfied = (v == 0);
-                        exactMatch = (comparison == 0);
-                        if (v < 0)
-                        {
-                            Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
-                            return null;
-                        }
+                        Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
+                        return null;
                     }
+                }
 
-                    if (opSatisfied)
+                if (opSatisfied)
+                {
+                    // read data position from index entry
+                    RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
+                    if (exactMatch && updateCacheAndStats)
                     {
-                        // read data position from index entry
-                        RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
-                        if (exactMatch && updateCacheAndStats)
-                        {
-                            assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
-                            DecoratedKey decoratedKey = (DecoratedKey)key;
+                        assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
+                        DecoratedKey decoratedKey = (DecoratedKey)key;
 
-                            if (logger.isTraceEnabled())
+                        if (logger.isTraceEnabled())
+                        {
+                            // expensive sanity check!  see CASSANDRA-4687
+                            try (FileDataInput fdi = dfile.createReader(indexEntry.position))
                             {
-                                // expensive sanity check!  see CASSANDRA-4687
-                                try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
-                                {
-                                    DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
-                                    if (!keyInDisk.equals(key))
-                                        throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
-                                }
+                                DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+                                if (!keyInDisk.equals(key))
+                                    throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
                             }
-
-                            // store exact match for the key
-                            cacheKey(decoratedKey, indexEntry);
                         }
-                        if (op == Operator.EQ && updateCacheAndStats)
-                            bloomFilterTracker.addTruePositive();
-                        Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
-                        return indexEntry;
-                    }
 
-                    RowIndexEntry.Serializer.skip(in);
+                        // store exact match for the key
+                        cacheKey(decoratedKey, indexEntry);
+                    }
+                    if (op == Operator.EQ && updateCacheAndStats)
+                        bloomFilterTracker.addTruePositive();
+                    Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
+                    return indexEntry;
                 }
+
+                RowIndexEntry.Serializer.skip(in);
             }
-            catch (IOException e)
-            {
-                markSuspect();
-                throw new CorruptSSTableException(e, path);
-            }
+        }
+        catch (IOException e)
+        {
+            markSuspect();
+            throw new CorruptSSTableException(e, path);
         }
 
         if (op == SSTableReader.Operator.EQ && updateCacheAndStats)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 77bf3d6..38dab9a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -121,7 +121,6 @@ public class BigTableWriter extends SSTableWriter
         if (logger.isTraceEnabled())
             logger.trace("wrote {} at {}", decoratedKey, dataEnd);
         iwriter.append(decoratedKey, index, dataEnd);
-        dbuilder.addPotentialBoundary(dataEnd);
     }
 
     /**
@@ -420,7 +419,6 @@ public class BigTableWriter extends SSTableWriter
                 logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
 
             summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
-            builder.addPotentialBoundary(indexStart);
         }
 
         /**