You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/09/04 13:46:30 UTC
[09/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630)
Faster sequential IO (CASSANDRA-8630)
Merge RandomAccessReader and NIODataInputStream class hierarchies
to share performance optimisation work across all readers.
patch by stefania; reviewed by ariel and benedict for CASSANDRA-8630
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce63ccc8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce63ccc8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce63ccc8
Branch: refs/heads/cassandra-3.0
Commit: ce63ccc842dc6e7129765391c611402eb02a3a23
Parents: 7c0bfe9
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Jul 27 16:34:46 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Sep 4 12:42:35 2015 +0100
----------------------------------------------------------------------
build.xml | 8 +-
lib/licenses/ohc-0.3.4.txt | 201 --------
lib/licenses/ohc-0.4.2.txt | 201 ++++++++
lib/ohc-core-0.4.2.jar | Bin 0 -> 126802 bytes
lib/ohc-core-0.4.jar | Bin 127890 -> 0 bytes
lib/ohc-core-j8-0.4.2.jar | Bin 0 -> 4994 bytes
lib/ohc-core-j8-0.4.jar | Bin 4989 -> 0 bytes
.../apache/cassandra/cache/AutoSavingCache.java | 2 +-
.../org/apache/cassandra/cache/OHCProvider.java | 3 +-
.../org/apache/cassandra/db/SystemKeyspace.java | 2 +-
.../db/commitlog/CommitLogReplayer.java | 19 +-
.../cassandra/hints/ChecksummedDataInput.java | 128 +++--
.../org/apache/cassandra/hints/HintsReader.java | 67 ++-
.../compress/CompressedRandomAccessReader.java | 147 +++---
.../io/compress/CompressedThrottledReader.java | 48 --
.../io/compress/CompressionMetadata.java | 2 +-
.../cassandra/io/sstable/KeyIterator.java | 1 -
.../io/sstable/format/SSTableReader.java | 100 ++--
.../cassandra/io/sstable/format/Version.java | 2 +
.../io/sstable/format/big/BigFormat.java | 9 +
.../io/sstable/format/big/BigTableReader.java | 108 ++---
.../io/sstable/format/big/BigTableWriter.java | 2 -
.../cassandra/io/util/AbstractDataInput.java | 343 -------------
.../io/util/BufferedSegmentedFile.java | 24 -
.../cassandra/io/util/ByteBufferDataInput.java | 171 -------
.../apache/cassandra/io/util/ChannelProxy.java | 4 +-
.../io/util/ChecksummedRandomAccessReader.java | 62 ++-
.../io/util/CompressedSegmentedFile.java | 100 ++--
.../cassandra/io/util/DataInputBuffer.java | 24 +-
.../io/util/DataIntegrityMetadata.java | 11 +-
.../apache/cassandra/io/util/FileDataInput.java | 25 +-
.../io/util/FileSegmentInputStream.java | 96 ++++
.../cassandra/io/util/ICompressedFile.java | 9 +-
.../cassandra/io/util/MemoryInputStream.java | 54 ++-
.../cassandra/io/util/MmappedRegions.java | 344 +++++++++++++
.../cassandra/io/util/MmappedSegmentedFile.java | 214 +++-----
.../cassandra/io/util/NIODataInputStream.java | 352 +-------------
.../cassandra/io/util/RandomAccessReader.java | 417 ++++++++++------
.../io/util/RebufferingInputStream.java | 286 +++++++++++
.../apache/cassandra/io/util/SegmentedFile.java | 115 ++---
.../cassandra/io/util/ThrottledReader.java | 48 --
.../compress/CompressedStreamWriter.java | 10 +-
.../apache/cassandra/utils/ByteBufferUtil.java | 3 -
.../org/apache/cassandra/utils/Throwables.java | 45 +-
.../apache/cassandra/utils/vint/VIntCoding.java | 2 +-
.../cassandra/db/commitlog/CommitLogTest.java | 8 +-
.../db/commitlog/CommitLogTestReplayer.java | 3 +-
.../hints/ChecksummedDataInputTest.java | 227 +++++++--
.../io/ChecksummedRandomAccessReaderTest.java | 127 -----
.../cassandra/io/RandomAccessReaderTest.java | 269 -----------
.../CompressedRandomAccessReaderTest.java | 245 +++++-----
.../CompressedSequentialWriterTest.java | 2 +-
.../cassandra/io/sstable/SSTableReaderTest.java | 3 +-
.../io/util/BufferedDataOutputStreamTest.java | 12 +-
.../io/util/BufferedRandomAccessFileTest.java | 91 +---
.../util/ChecksummedRandomAccessReaderTest.java | 127 +++++
.../io/util/FileSegmentInputStreamTest.java | 131 +++++
.../apache/cassandra/io/util/MemoryTest.java | 37 ++
.../cassandra/io/util/MmappedRegionsTest.java | 375 ++++++++++++++
.../io/util/NIODataInputStreamTest.java | 16 +-
.../io/util/RandomAccessReaderTest.java | 483 +++++++++++++++++++
61 files changed, 3339 insertions(+), 2626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 28f08d6..252b1a8 100644
--- a/build.xml
+++ b/build.xml
@@ -415,8 +415,8 @@
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
-->
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj" version="4.4.2" />
- <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" />
- <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" />
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" />
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" />
<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" />
<dependency groupId="org.fusesource" artifactId="sigar" version="1.6.4">
<exclusion groupId="log4j" artifactId="log4j"/>
@@ -470,8 +470,8 @@
<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
-->
<dependency groupId="org.eclipse.jdt.core.compiler" artifactId="ecj"/>
- <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4" />
- <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4" />
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.4.2" />
+ <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core-j8" version="0.4.2" />
<dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
<dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/>
<dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/>
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.3.4.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/ohc-0.3.4.txt b/lib/licenses/ohc-0.3.4.txt
deleted file mode 100644
index eb6b5d3..0000000
--- a/lib/licenses/ohc-0.3.4.txt
+++ /dev/null
@@ -1,201 +0,0 @@
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/licenses/ohc-0.4.2.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/ohc-0.4.2.txt b/lib/licenses/ohc-0.4.2.txt
new file mode 100644
index 0000000..eb6b5d3
--- /dev/null
+++ b/lib/licenses/ohc-0.4.2.txt
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.2.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-0.4.2.jar b/lib/ohc-core-0.4.2.jar
new file mode 100644
index 0000000..019adcd
Binary files /dev/null and b/lib/ohc-core-0.4.2.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-0.4.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-0.4.jar b/lib/ohc-core-0.4.jar
deleted file mode 100644
index 1b1b939..0000000
Binary files a/lib/ohc-core-0.4.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.2.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-j8-0.4.2.jar b/lib/ohc-core-j8-0.4.2.jar
new file mode 100644
index 0000000..583f4aa
Binary files /dev/null and b/lib/ohc-core-j8-0.4.2.jar differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/lib/ohc-core-j8-0.4.jar
----------------------------------------------------------------------
diff --git a/lib/ohc-core-j8-0.4.jar b/lib/ohc-core-j8-0.4.jar
deleted file mode 100644
index f97ddf5..0000000
Binary files a/lib/ohc-core-j8-0.4.jar and /dev/null differ
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 8650d47..ebd2830 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -68,7 +68,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
{
public InputStream getInputStream(File dataPath, File crcPath) throws IOException
{
- return ChecksummedRandomAccessReader.open(dataPath, crcPath);
+ return new ChecksummedRandomAccessReader.Builder(dataPath, crcPath).build();
}
public OutputStream getOutputStream(File dataPath, File crcPath)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java
index b0b4521..c6c6bb7 100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBufferFixed;
import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
import org.caffinitas.ohc.OHCache;
import org.caffinitas.ohc.OHCacheBuilder;
@@ -172,7 +173,7 @@ public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
{
try
{
- NIODataInputStream in = new DataInputBuffer(buf, false);
+ RebufferingInputStream in = new DataInputBuffer(buf, false);
boolean isSentinel = in.readBoolean();
if (isSentinel)
return new RowCacheSentinel(in.readLong());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index cf8e14d..d54ee8b 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -676,7 +676,7 @@ public final class SystemKeyspace
{
try
{
- NIODataInputStream in = new DataInputBuffer(bytes, true);
+ RebufferingInputStream in = new DataInputBuffer(bytes, true);
return Pair.create(ReplayPosition.serializer.deserialize(in), in.available() > 0 ? in.readLong() : Long.MIN_VALUE);
}
catch (IOException e)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 4f50008..7049191 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -47,12 +47,13 @@ import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.util.FileSegmentInputStream;
+import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.io.compress.ICompressor;
-import org.apache.cassandra.io.util.ByteBufferDataInput;
+import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.NIODataInputStream;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.FBUtilities;
@@ -290,8 +291,8 @@ public class CommitLogReplayer
public void recover(File file, boolean tolerateTruncation) throws IOException
{
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
- RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
- try
+ try(ChannelProxy channel = new ChannelProxy(file);
+ RandomAccessReader reader = RandomAccessReader.open(channel))
{
if (desc.version < CommitLogDescriptor.VERSION_21)
{
@@ -299,7 +300,7 @@ public class CommitLogReplayer
return;
if (globalPosition.segment == desc.id)
reader.seek(globalPosition.position);
- replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation);
+ replaySyncSection(reader, (int) reader.length(), desc, desc.fileName(), tolerateTruncation);
return;
}
@@ -388,7 +389,7 @@ public class CommitLogReplayer
if (uncompressedLength > uncompressedBuffer.length)
uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)];
compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0);
- sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0);
+ sectionReader = new FileSegmentInputStream(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos);
errorContext = "compressed section at " + start + " in " + errorContext;
}
catch (IOException | ArrayIndexOutOfBoundsException e)
@@ -403,10 +404,6 @@ public class CommitLogReplayer
if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection))
break;
}
- }
- finally
- {
- FileUtils.closeQuietly(reader);
logger.info("Finished reading {}", file);
}
}
@@ -522,7 +519,7 @@ public class CommitLogReplayer
{
final Mutation mutation;
- try (NIODataInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
+ try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
{
mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
index fa727bc..543f14e 100644
--- a/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/ChecksummedDataInput.java
@@ -17,98 +17,148 @@
*/
package org.apache.cassandra.hints;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.zip.CRC32;
-import org.apache.cassandra.io.util.AbstractDataInput;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
/**
- * An {@link AbstractDataInput} wrapper that calctulates the CRC in place.
+ * A {@link RandomAccessReader} wrapper that calctulates the CRC in place.
*
* Useful for {@link org.apache.cassandra.hints.HintsReader}, for example, where we must verify the CRC, yet don't want
- * to allocate an extra byte array just that purpose.
+ * to allocate an extra byte array just that purpose. The CRC can be embedded in the input stream and checked via checkCrc().
*
- * In addition to calculating the CRC, allows to enforce a maximim known size. This is needed
+ * In addition to calculating the CRC, it allows to enforce a maximim known size. This is needed
* so that {@link org.apache.cassandra.db.Mutation.MutationSerializer} doesn't blow up the heap when deserializing a
* corrupted sequence by reading a huge corrupted length of bytes via
* via {@link org.apache.cassandra.utils.ByteBufferUtil#readWithLength(java.io.DataInput)}.
*/
-public final class ChecksummedDataInput extends AbstractDataInput
+public final class ChecksummedDataInput extends RandomAccessReader.RandomAccessReaderWithOwnChannel
{
private final CRC32 crc;
- private final AbstractDataInput source;
- private int limit;
+ private int crcPosition;
+ private boolean crcUpdateDisabled;
- private ChecksummedDataInput(AbstractDataInput source)
+ private long limit;
+ private FileMark limitMark;
+
+ private ChecksummedDataInput(Builder builder)
{
- this.source = source;
+ super(builder);
crc = new CRC32();
- limit = Integer.MAX_VALUE;
+ crcPosition = 0;
+ crcUpdateDisabled = false;
+
+ resetLimit();
}
- public static ChecksummedDataInput wrap(AbstractDataInput source)
+ public static ChecksummedDataInput open(File file)
{
- return new ChecksummedDataInput(source);
+ return new Builder(new ChannelProxy(file)).build();
}
public void resetCrc()
{
crc.reset();
+ crcPosition = buffer.position();
}
- public void resetLimit()
+ public void limit(long newLimit)
{
- limit = Integer.MAX_VALUE;
+ limit = newLimit;
+ limitMark = mark();
}
- public void limit(int newLimit)
+ public void resetLimit()
{
- limit = newLimit;
+ limit = Long.MAX_VALUE;
+ limitMark = null;
}
- public int bytesRemaining()
+ public void checkLimit(int length) throws IOException
{
- return limit;
+ if (limitMark == null)
+ return;
+
+ if ((bytesPastLimit() + length) > limit)
+ throw new IOException("Digest mismatch exception");
}
- public int getCrc()
+ public long bytesPastLimit()
{
- return (int) crc.getValue();
+ assert limitMark != null;
+ return bytesPastMark(limitMark);
}
- public void seek(long position) throws IOException
+ public boolean checkCrc() throws IOException
{
- source.seek(position);
+ try
+ {
+ updateCrc();
+
+ // we must diable crc updates in case we rebuffer
+ // when called source.readInt()
+ crcUpdateDisabled = true;
+ return ((int) crc.getValue()) == readInt();
+ }
+ finally
+ {
+ crcPosition = buffer.position();
+ crcUpdateDisabled = false;
+ }
}
- public long getPosition()
+ @Override
+ public void readFully(byte[] b) throws IOException
{
- return source.getPosition();
+ checkLimit(b.length);
+ super.readFully(b);
}
- public long getPositionLimit()
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
{
- return source.getPositionLimit();
+ checkLimit(len);
+ return super.read(b, off, len);
}
- public int read() throws IOException
+ @Override
+ public void reBuffer()
{
- int b = source.read();
- crc.update(b);
- limit--;
- return b;
+ updateCrc();
+ super.reBuffer();
+ crcPosition = buffer.position();
}
- @Override
- public int read(byte[] buff, int offset, int length) throws IOException
+ private void updateCrc()
{
- if (length > limit)
- throw new IOException("Digest mismatch exception");
+ if (crcPosition == buffer.position() | crcUpdateDisabled)
+ return;
+
+ assert crcPosition >= 0 && crcPosition < buffer.position();
- int copied = source.read(buff, offset, length);
- crc.update(buff, offset, copied);
- limit -= copied;
- return copied;
+ ByteBuffer unprocessed = buffer.duplicate();
+ unprocessed.position(crcPosition)
+ .limit(buffer.position());
+
+ crc.update(unprocessed);
+ }
+
+ public final static class Builder extends RandomAccessReader.Builder
+ {
+ public Builder(ChannelProxy channel)
+ {
+ super(channel);
+ }
+
+ public ChecksummedDataInput build()
+ {
+ return new ChecksummedDataInput(this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/hints/HintsReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsReader.java b/src/java/org/apache/cassandra/hints/HintsReader.java
index 7d164b4..bc83654 100644
--- a/src/java/org/apache/cassandra/hints/HintsReader.java
+++ b/src/java/org/apache/cassandra/hints/HintsReader.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import javax.annotation.Nullable;
+import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
@@ -32,7 +33,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CLibrary;
@@ -57,25 +57,23 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
private final HintsDescriptor descriptor;
private final File file;
- private final RandomAccessReader reader;
- private final ChecksummedDataInput crcInput;
+ private final ChecksummedDataInput input;
// we pass the RateLimiter into HintsReader itself because it's cheaper to calculate the size before the hint is deserialized
@Nullable
private final RateLimiter rateLimiter;
- private HintsReader(HintsDescriptor descriptor, File file, RandomAccessReader reader, RateLimiter rateLimiter)
+ private HintsReader(HintsDescriptor descriptor, File file, ChecksummedDataInput reader, RateLimiter rateLimiter)
{
this.descriptor = descriptor;
this.file = file;
- this.reader = reader;
- this.crcInput = ChecksummedDataInput.wrap(reader);
+ this.input = reader;
this.rateLimiter = rateLimiter;
}
static HintsReader open(File file, RateLimiter rateLimiter)
{
- RandomAccessReader reader = RandomAccessReader.open(file);
+ ChecksummedDataInput reader = ChecksummedDataInput.open(file);
try
{
HintsDescriptor descriptor = HintsDescriptor.deserialize(reader);
@@ -83,7 +81,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
}
catch (IOException e)
{
- reader.close();
+ FileUtils.closeQuietly(reader);
throw new FSReadError(e, file);
}
}
@@ -95,7 +93,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
public void close()
{
- FileUtils.closeQuietly(reader);
+ FileUtils.closeQuietly(input);
}
public HintsDescriptor descriptor()
@@ -105,7 +103,7 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
void seek(long newPosition)
{
- reader.seek(newPosition);
+ input.seek(newPosition);
}
public Iterator<Page> iterator()
@@ -138,12 +136,12 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
@SuppressWarnings("resource")
protected Page computeNext()
{
- CLibrary.trySkipCache(reader.getChannel().getFileDescriptor(), 0, reader.getFilePointer(), reader.getPath());
+ CLibrary.trySkipCache(input.getChannel().getFileDescriptor(), 0, input.getFilePointer(), input.getPath());
- if (reader.length() == reader.getFilePointer())
+ if (input.length() == input.getFilePointer())
return endOfData();
- return new Page(reader.getFilePointer());
+ return new Page(input.getFilePointer());
}
}
@@ -166,9 +164,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = reader.getFilePointer();
+ long position = input.getFilePointer();
- if (reader.length() == position)
+ if (input.length() == position)
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
@@ -190,13 +188,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
private Hint computeNextInternal() throws IOException
{
- crcInput.resetCrc();
- crcInput.resetLimit();
+ input.resetCrc();
+ input.resetLimit();
- int size = crcInput.readInt();
+ int size = input.readInt();
// if we cannot corroborate the size via crc, then we cannot safely skip this hint
- if (reader.readInt() != crcInput.getCrc())
+ if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readHint(size);
@@ -206,12 +204,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
if (rateLimiter != null)
rateLimiter.acquire(size);
- crcInput.limit(size);
+ input.limit(size);
Hint hint;
try
{
- hint = Hint.serializer.deserialize(crcInput, descriptor.messagingVersion());
+ hint = Hint.serializer.deserialize(input, descriptor.messagingVersion());
+ input.checkLimit(0);
}
catch (UnknownColumnFamilyException e)
{
@@ -219,18 +218,18 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
descriptor.hostId,
e.cfId,
descriptor.fileName());
- reader.skipBytes(crcInput.bytesRemaining());
+ input.skipBytes(Ints.checkedCast(size - input.bytesPastLimit()));
return null;
}
- if (reader.readInt() == crcInput.getCrc())
+ if (input.checkCrc())
return hint;
// log a warning and skip the corrupted entry
logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
descriptor.hostId,
- crcInput.getPosition() - size - 4,
+ input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
@@ -255,9 +254,9 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
do
{
- long position = reader.getFilePointer();
+ long position = input.getFilePointer();
- if (reader.length() == position)
+ if (input.length() == position)
return endOfData(); // reached EOF
if (position - offset >= PAGE_SIZE)
@@ -279,13 +278,13 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
private ByteBuffer computeNextInternal() throws IOException
{
- crcInput.resetCrc();
- crcInput.resetLimit();
+ input.resetCrc();
+ input.resetLimit();
- int size = crcInput.readInt();
+ int size = input.readInt();
// if we cannot corroborate the size via crc, then we cannot safely skip this hint
- if (reader.readInt() != crcInput.getCrc())
+ if (!input.checkCrc())
throw new IOException("Digest mismatch exception");
return readBuffer(size);
@@ -295,16 +294,16 @@ final class HintsReader implements AutoCloseable, Iterable<HintsReader.Page>
{
if (rateLimiter != null)
rateLimiter.acquire(size);
- crcInput.limit(size);
+ input.limit(size);
- ByteBuffer buffer = ByteBufferUtil.read(crcInput, size);
- if (reader.readInt() == crcInput.getCrc())
+ ByteBuffer buffer = ByteBufferUtil.read(input, size);
+ if (input.checkCrc())
return buffer;
// log a warning and skip the corrupted entry
logger.warn("Failed to read a hint for {} - digest mismatch for hint at position {} in file {}",
descriptor.hostId,
- crcInput.getPosition() - size - 4,
+ input.getPosition() - size - 4,
descriptor.fileName());
return null;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index c38f4d2..0242871 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -19,11 +19,7 @@ package org.apache.cassandra.io.compress;
import java.io.*;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.zip.Adler32;
import java.util.zip.Checksum;
import com.google.common.primitives.Ints;
@@ -31,7 +27,6 @@ import com.google.common.primitives.Ints;
import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.util.*;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.memory.BufferPool;
/**
@@ -40,18 +35,6 @@ import org.apache.cassandra.utils.memory.BufferPool;
*/
public class CompressedRandomAccessReader extends RandomAccessReader
{
- public static CompressedRandomAccessReader open(ChannelProxy channel, CompressionMetadata metadata)
- {
- return new CompressedRandomAccessReader(channel, metadata, null);
- }
-
- public static CompressedRandomAccessReader open(ICompressedFile file)
- {
- return new CompressedRandomAccessReader(file.channel(), file.getMetadata(), file);
- }
-
- private final TreeMap<Long, MappedByteBuffer> chunkSegments;
-
private final CompressionMetadata metadata;
// we read the raw compressed bytes into this buffer, then move the uncompressed ones into super.buffer.
@@ -63,39 +46,59 @@ public class CompressedRandomAccessReader extends RandomAccessReader
// raw checksum bytes
private ByteBuffer checksumBytes;
- protected CompressedRandomAccessReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file)
+ protected CompressedRandomAccessReader(Builder builder)
{
- super(channel, metadata.chunkLength(), metadata.compressedFileLength, metadata.compressor().preferredBufferType());
- this.metadata = metadata;
- checksum = metadata.checksumType.newInstance();
+ super(builder.initializeBuffers(false));
+ this.metadata = builder.metadata;
+ this.checksum = metadata.checksumType.newInstance();
- chunkSegments = file == null ? null : file.chunkSegments();
- if (chunkSegments == null)
- {
- compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()), metadata.compressor().preferredBufferType());
- checksumBytes = ByteBuffer.wrap(new byte[4]);
- }
+ initializeBuffer();
}
- protected int getBufferSize(int size)
+ @Override
+ protected int getBufferSize(RandomAccessReader.Builder builder)
{
- assert Integer.bitCount(size) == 1; //must be a power of two
- return size;
+ // this is the chunk data length, throttling is OK with this
+ return builder.bufferSize;
}
@Override
- public void close()
+ protected void initializeBuffer()
{
- super.close();
+ buffer = allocateBuffer(bufferSize);
+ buffer.limit(0);
- if (compressed != null)
+ if (regions == null)
{
- BufferPool.put(compressed);
- compressed = null;
+ compressed = allocateBuffer(metadata.compressor().initialCompressedBufferLength(metadata.chunkLength()));
+ checksumBytes = ByteBuffer.wrap(new byte[4]);
}
}
- private void reBufferStandard()
+ @Override
+ protected void releaseBuffer()
+ {
+ try
+ {
+ if (buffer != null)
+ {
+ BufferPool.put(buffer);
+ buffer = null;
+ }
+ }
+ finally
+ {
+ // this will always be null if using mmap access mode (unlike in parent, where buffer is set to a region)
+ if (compressed != null)
+ {
+ BufferPool.put(compressed);
+ compressed = null;
+ }
+ }
+ }
+
+ @Override
+ protected void reBufferStandard()
{
try
{
@@ -105,13 +108,19 @@ public class CompressedRandomAccessReader extends RandomAccessReader
CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
if (compressed.capacity() < chunk.length)
- compressed = allocateBuffer(chunk.length, metadata.compressor().preferredBufferType());
+ {
+ BufferPool.put(compressed);
+ compressed = allocateBuffer(chunk.length);
+ }
else
+ {
compressed.clear();
- compressed.limit(chunk.length);
+ }
+ compressed.limit(chunk.length);
if (channel.read(compressed, chunk.offset) != chunk.length)
throw new CorruptBlockException(getPath(), chunk);
+
compressed.flip();
buffer.clear();
@@ -158,7 +167,8 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
}
- private void reBufferMmap()
+ @Override
+ protected void reBufferMmap()
{
try
{
@@ -167,10 +177,10 @@ public class CompressedRandomAccessReader extends RandomAccessReader
CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
- Map.Entry<Long, MappedByteBuffer> entry = chunkSegments.floorEntry(chunk.offset);
- long segmentOffset = entry.getKey();
+ MmappedRegions.Region region = regions.floor(chunk.offset);
+ long segmentOffset = region.bottom();
int chunkOffset = Ints.checkedCast(chunk.offset - segmentOffset);
- ByteBuffer compressedChunk = entry.getValue().duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
+ ByteBuffer compressedChunk = region.buffer.duplicate(); // TODO: change to slice(chunkOffset) when we upgrade LZ4-java
compressedChunk.position(chunkOffset).limit(chunkOffset + chunk.length);
@@ -218,19 +228,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
}
- @Override
- protected void reBuffer()
- {
- if (chunkSegments != null)
- {
- reBufferMmap();
- }
- else
- {
- reBufferStandard();
- }
- }
-
private int checksum(CompressionMetadata.Chunk chunk) throws IOException
{
long position = chunk.offset + chunk.length;
@@ -240,11 +237,6 @@ public class CompressedRandomAccessReader extends RandomAccessReader
return checksumBytes.getInt(0);
}
- public int getTotalBufferSize()
- {
- return super.getTotalBufferSize() + (chunkSegments != null ? 0 : compressed.capacity());
- }
-
@Override
public long length()
{
@@ -256,4 +248,39 @@ public class CompressedRandomAccessReader extends RandomAccessReader
{
return String.format("%s - chunk length %d, data length %d.", getPath(), metadata.chunkLength(), metadata.dataLength);
}
+
+ public final static class Builder extends RandomAccessReader.Builder
+ {
+ private final CompressionMetadata metadata;
+
+ public Builder(ICompressedFile file)
+ {
+ super(file.channel());
+ this.metadata = applyMetadata(file.getMetadata());
+ this.regions = file.regions();
+ }
+
+ public Builder(ChannelProxy channel, CompressionMetadata metadata)
+ {
+ super(channel);
+ this.metadata = applyMetadata(metadata);
+ }
+
+ private CompressionMetadata applyMetadata(CompressionMetadata metadata)
+ {
+ this.overrideLength = metadata.compressedFileLength;
+ this.bufferSize = metadata.chunkLength();
+ this.bufferType = metadata.compressor().preferredBufferType();
+
+ assert Integer.bitCount(this.bufferSize) == 1; //must be a power of two
+
+ return metadata;
+ }
+
+ @Override
+ public RandomAccessReader build()
+ {
+ return new CompressedRandomAccessReader(this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java b/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
deleted file mode 100644
index ea5edaf..0000000
--- a/src/java/org/apache/cassandra/io/compress/CompressedThrottledReader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.apache.cassandra.io.compress;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.io.util.ChannelProxy;
-import org.apache.cassandra.io.util.ICompressedFile;
-
-public class CompressedThrottledReader extends CompressedRandomAccessReader
-{
- private final RateLimiter limiter;
-
- public CompressedThrottledReader(ChannelProxy channel, CompressionMetadata metadata, ICompressedFile file, RateLimiter limiter)
- {
- super(channel, metadata, file);
- this.limiter = limiter;
- }
-
- protected void reBuffer()
- {
- limiter.acquire(buffer.capacity());
- super.reBuffer();
- }
-
- public static CompressedThrottledReader open(ICompressedFile file, RateLimiter limiter)
- {
- return new CompressedThrottledReader(file.channel(), file.getMetadata(), file, limiter);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index f5d8f7e..1681b0c 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -92,7 +92,7 @@ public class CompressionMetadata
}
@VisibleForTesting
- CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
+ public CompressionMetadata(String indexFilePath, long compressedLength, ChecksumType checksumType)
{
this.indexFilePath = indexFilePath;
this.checksumType = checksumType;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 124720e..6f1e2f4 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.CloseableIterator;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 522f7a1..5d8ab50 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -828,8 +828,6 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (!summaryLoaded)
{
summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
- ibuilder.addPotentialBoundary(indexPosition);
- dbuilder.addPotentialBoundary(indexEntry.position);
}
}
@@ -868,8 +866,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
metadata.params.minIndexInterval, metadata.params.maxIndexInterval);
first = decorateKey(ByteBufferUtil.readWithLength(iStream));
last = decorateKey(ByteBufferUtil.readWithLength(iStream));
- ibuilder.deserializeBounds(iStream);
- dbuilder.deserializeBounds(iStream);
+ ibuilder.deserializeBounds(iStream, descriptor.version);
+ dbuilder.deserializeBounds(iStream, descriptor.version);
}
catch (IOException e)
{
@@ -904,39 +902,35 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (ifile == null)
return false;
- Iterator<FileDataInput> segments = ifile.iterator(0);
int i = 0;
int summaryEntriesChecked = 0;
int expectedIndexInterval = getMinIndexInterval();
- while (segments.hasNext())
+ String path = null;
+ try (FileDataInput in = ifile.createReader(0))
{
- String path = null;
- try (FileDataInput in = segments.next())
+ path = in.getPath();
+ while (!in.isEOF())
{
- path = in.getPath();
- while (!in.isEOF())
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ if (i % expectedIndexInterval == 0)
{
- ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
- if (i % expectedIndexInterval == 0)
- {
- ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
- if (!summaryKey.equals(indexKey))
- return false;
- summaryEntriesChecked++;
+ ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
+ if (!summaryKey.equals(indexKey))
+ return false;
+ summaryEntriesChecked++;
- if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
- return true;
- }
- RowIndexEntry.Serializer.skip(in);
- i++;
+ if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
+ return true;
}
- }
- catch (IOException e)
- {
- markSuspect();
- throw new CorruptSSTableException(e, path);
+ RowIndexEntry.Serializer.skip(in);
+ i++;
}
}
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, path);
+ }
return true;
}
@@ -972,8 +966,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
ByteBufferUtil.writeWithLength(first.getKey(), oStream);
ByteBufferUtil.writeWithLength(last.getKey(), oStream);
- ibuilder.serializeBounds(oStream);
- dbuilder.serializeBounds(oStream);
+ ibuilder.serializeBounds(oStream, descriptor.version);
+ dbuilder.serializeBounds(oStream, descriptor.version);
}
catch (IOException e)
{
@@ -1600,29 +1594,25 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
if (ifile == null)
return null;
- Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
- while (segments.hasNext())
+ String path = null;
+ try (FileDataInput in = ifile.createReader(sampledPosition))
{
- String path = null;
- try (FileDataInput in = segments.next();)
+ path = in.getPath();
+ while (!in.isEOF())
{
- path = in.getPath();
- while (!in.isEOF())
- {
- ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
- DecoratedKey indexDecoratedKey = decorateKey(indexKey);
- if (indexDecoratedKey.compareTo(token) > 0)
- return indexDecoratedKey;
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+ if (indexDecoratedKey.compareTo(token) > 0)
+ return indexDecoratedKey;
- RowIndexEntry.Serializer.skip(in);
- }
- }
- catch (IOException e)
- {
- markSuspect();
- throw new CorruptSSTableException(e, path);
+ RowIndexEntry.Serializer.skip(in);
}
}
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, path);
+ }
return null;
}
@@ -1744,11 +1734,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
*/
public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, RateLimiter limiter, boolean isForThrift);
-
-
public FileDataInput getFileDataInput(long position)
{
- return dfile.getSegment(position);
+ return dfile.createReader(position);
}
/**
@@ -1939,7 +1927,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
public RandomAccessReader openDataReader(RateLimiter limiter)
{
assert limiter != null;
- return dfile.createThrottledReader(limiter);
+ return dfile.createReader(limiter);
}
public RandomAccessReader openDataReader()
@@ -1954,6 +1942,16 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
return null;
}
+ public ChannelProxy getDataChannel()
+ {
+ return dfile.channel;
+ }
+
+ public ChannelProxy getIndexChannel()
+ {
+ return ifile.channel;
+ }
+
/**
* @param component component to get timestamp.
* @return last modified time for given component. 0 if given component does not exist or IO error occurs.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/Version.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/Version.java b/src/java/org/apache/cassandra/io/sstable/format/Version.java
index 4fcf055..16829ab 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/Version.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/Version.java
@@ -68,6 +68,8 @@ public abstract class Version
public abstract boolean hasCompactionAncestors();
+ public abstract boolean hasBoundaries();
+
public String getVersion()
{
return version;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index d65710e..cbc2c39 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -134,6 +134,7 @@ public class BigFormat implements SSTableFormat
private final boolean newFileName;
public final boolean storeRows;
public final int correspondingMessagingVersion; // Only use by storage that 'storeRows' so far
+ public final boolean hasBoundaries;
/**
* CASSANDRA-8413: 3.0 bloom filter representation changed (two longs just swapped)
* have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution.
@@ -176,6 +177,8 @@ public class BigFormat implements SSTableFormat
correspondingMessagingVersion = storeRows
? MessagingService.VERSION_30
: MessagingService.VERSION_21;
+
+ hasBoundaries = version.compareTo("ma") < 0;
}
@Override
@@ -251,6 +254,12 @@ public class BigFormat implements SSTableFormat
}
@Override
+ public boolean hasBoundaries()
+ {
+ return hasBoundaries;
+ }
+
+ @Override
public boolean isCompatible()
{
return version.compareTo(earliest_supported_version) >= 0 && version.charAt(0) <= current_version.charAt(0);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 87608fd..4b66942 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -178,78 +178,74 @@ public class BigTableReader extends SSTableReader
// is lesser than the first key of next interval (and in that case we must return the position of the first key
// of the next interval).
int i = 0;
- Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
- while (segments.hasNext())
+ String path = null;
+ try (FileDataInput in = ifile.createReader(sampledPosition))
{
- String path = null;
- try (FileDataInput in = segments.next())
+ path = in.getPath();
+ while (!in.isEOF())
{
- path = in.getPath();
- while (!in.isEOF())
- {
- i++;
+ i++;
- ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+ ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
- boolean opSatisfied; // did we find an appropriate position for the op requested
- boolean exactMatch; // is the current position an exact match for the key, suitable for caching
+ boolean opSatisfied; // did we find an appropriate position for the op requested
+ boolean exactMatch; // is the current position an exact match for the key, suitable for caching
- // Compare raw keys if possible for performance, otherwise compare decorated keys.
- if (op == Operator.EQ && i <= effectiveInterval)
- {
- opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
- }
- else
+ // Compare raw keys if possible for performance, otherwise compare decorated keys.
+ if (op == Operator.EQ && i <= effectiveInterval)
+ {
+ opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
+ }
+ else
+ {
+ DecoratedKey indexDecoratedKey = decorateKey(indexKey);
+ int comparison = indexDecoratedKey.compareTo(key);
+ int v = op.apply(comparison);
+ opSatisfied = (v == 0);
+ exactMatch = (comparison == 0);
+ if (v < 0)
{
- DecoratedKey indexDecoratedKey = decorateKey(indexKey);
- int comparison = indexDecoratedKey.compareTo(key);
- int v = op.apply(comparison);
- opSatisfied = (v == 0);
- exactMatch = (comparison == 0);
- if (v < 0)
- {
- Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
- return null;
- }
+ Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
+ return null;
}
+ }
- if (opSatisfied)
+ if (opSatisfied)
+ {
+ // read data position from index entry
+ RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
+ if (exactMatch && updateCacheAndStats)
{
- // read data position from index entry
- RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in);
- if (exactMatch && updateCacheAndStats)
- {
- assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
- DecoratedKey decoratedKey = (DecoratedKey)key;
+ assert key instanceof DecoratedKey; // key can be == to the index key only if it's a true row key
+ DecoratedKey decoratedKey = (DecoratedKey)key;
- if (logger.isTraceEnabled())
+ if (logger.isTraceEnabled())
+ {
+ // expensive sanity check! see CASSANDRA-4687
+ try (FileDataInput fdi = dfile.createReader(indexEntry.position))
{
- // expensive sanity check! see CASSANDRA-4687
- try (FileDataInput fdi = dfile.getSegment(indexEntry.position))
- {
- DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
- if (!keyInDisk.equals(key))
- throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
- }
+ DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
+ if (!keyInDisk.equals(key))
+ throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
}
-
- // store exact match for the key
- cacheKey(decoratedKey, indexEntry);
}
- if (op == Operator.EQ && updateCacheAndStats)
- bloomFilterTracker.addTruePositive();
- Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
- return indexEntry;
- }
- RowIndexEntry.Serializer.skip(in);
+ // store exact match for the key
+ cacheKey(decoratedKey, indexEntry);
+ }
+ if (op == Operator.EQ && updateCacheAndStats)
+ bloomFilterTracker.addTruePositive();
+ Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndex().size(), descriptor.generation);
+ return indexEntry;
}
+
+ RowIndexEntry.Serializer.skip(in);
}
- catch (IOException e)
- {
- markSuspect();
- throw new CorruptSSTableException(e, path);
- }
+ }
+ catch (IOException e)
+ {
+ markSuspect();
+ throw new CorruptSSTableException(e, path);
}
if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 77bf3d6..38dab9a 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -121,7 +121,6 @@ public class BigTableWriter extends SSTableWriter
if (logger.isTraceEnabled())
logger.trace("wrote {} at {}", decoratedKey, dataEnd);
iwriter.append(decoratedKey, index, dataEnd);
- dbuilder.addPotentialBoundary(dataEnd);
}
/**
@@ -420,7 +419,6 @@ public class BigTableWriter extends SSTableWriter
logger.trace("wrote index entry: {} at {}", indexEntry, indexStart);
summary.maybeAddEntry(key, indexStart, indexEnd, dataEnd);
- builder.addPotentialBoundary(indexStart);
}
/**